Home | History | Annotate | Download | only in source
      1 /*
      2  *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/system_wrappers/include/data_log.h"
     12 
     13 #include <assert.h>
     14 
     15 #include <algorithm>
     16 #include <list>
     17 
     18 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
     19 #include "webrtc/system_wrappers/include/event_wrapper.h"
     20 #include "webrtc/system_wrappers/include/file_wrapper.h"
     21 #include "webrtc/system_wrappers/include/rw_lock_wrapper.h"
     22 
     23 namespace webrtc {
     24 
     25 DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
     26   CriticalSectionWrapper::CreateCriticalSection());
     27 
     28 DataLogImpl* DataLogImpl::instance_ = NULL;
     29 
     30 // A Row contains cells, which are indexed by the column names as std::string.
     31 // The string index is treated in a case sensitive way.
     32 class Row {
     33  public:
     34   Row();
     35   ~Row();
     36 
     37   // Inserts a Container into the cell of the column specified with
     38   // column_name.
     39   // column_name is treated in a case sensitive way.
     40   int InsertCell(const std::string& column_name,
     41                  const Container* value_container);
     42 
     43   // Converts the value at the column specified by column_name to a string
     44   // stored in value_string.
     45   // column_name is treated in a case sensitive way.
     46   void ToString(const std::string& column_name, std::string* value_string);
     47 
     48  private:
     49   // Collection of containers indexed by column name as std::string
     50   typedef std::map<std::string, const Container*> CellMap;
     51 
     52   CellMap                   cells_;
     53   CriticalSectionWrapper*   cells_lock_;
     54 };
     55 
     56 // A LogTable contains multiple rows, where only the latest row is active for
     57 // editing. The rows are defined by the ColumnMap, which contains the name of
     58 // each column and the length of the column (1 for one-value-columns and greater
     59 // than 1 for multi-value-columns).
     60 class LogTable {
     61  public:
     62   LogTable();
     63   ~LogTable();
     64 
     65   // Adds the column with name column_name to the table. The column will be a
     66   // multi-value-column if multi_value_length is greater than 1.
     67   // column_name is treated in a case sensitive way.
     68   int AddColumn(const std::string& column_name, int multi_value_length);
     69 
     70   // Buffers the current row while it is waiting to be written to file,
     71   // which is done by a call to Flush(). A new row is available when the
     72   // function returns
     73   void NextRow();
     74 
     75   // Inserts a Container into the cell of the column specified with
     76   // column_name.
     77   // column_name is treated in a case sensitive way.
     78   int InsertCell(const std::string& column_name,
     79                  const Container* value_container);
     80 
     81   // Creates a log file, named as specified in the string file_name, to
     82   // where the table will be written when calling Flush().
     83   int CreateLogFile(const std::string& file_name);
     84 
     85   // Write all complete rows to file.
     86   // May not be called by two threads simultaneously (doing so may result in
     87   // a race condition). Will be called by the file_writer_thread_ when that
     88   // thread is running.
     89   void Flush();
     90 
     91  private:
     92   // Collection of multi_value_lengths indexed by column name as std::string
     93   typedef std::map<std::string, int> ColumnMap;
     94   typedef std::list<Row*> RowList;
     95 
     96   ColumnMap               columns_;
     97   RowList                 rows_[2];
     98   RowList*                rows_history_;
     99   RowList*                rows_flush_;
    100   Row*                    current_row_;
    101   FileWrapper*            file_;
    102   bool                    write_header_;
    103   CriticalSectionWrapper* table_lock_;
    104 };
    105 
    106 Row::Row()
    107   : cells_(),
    108     cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
    109 }
    110 
    111 Row::~Row() {
    112   for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
    113     delete it->second;
    114     // For maps all iterators (except the erased) are valid after an erase
    115     cells_.erase(it++);
    116   }
    117   delete cells_lock_;
    118 }
    119 
    120 int Row::InsertCell(const std::string& column_name,
    121                     const Container* value_container) {
    122   CriticalSectionScoped synchronize(cells_lock_);
    123   assert(cells_.count(column_name) == 0);
    124   if (cells_.count(column_name) > 0)
    125     return -1;
    126   cells_[column_name] = value_container;
    127   return 0;
    128 }
    129 
    130 void Row::ToString(const std::string& column_name,
    131                    std::string* value_string) {
    132   CriticalSectionScoped synchronize(cells_lock_);
    133   const Container* container = cells_[column_name];
    134   if (container == NULL) {
    135     *value_string = "NaN,";
    136     return;
    137   }
    138   container->ToString(value_string);
    139 }
    140 
    141 LogTable::LogTable()
    142   : columns_(),
    143     rows_(),
    144     rows_history_(&rows_[0]),
    145     rows_flush_(&rows_[1]),
    146     current_row_(new Row),
    147     file_(FileWrapper::Create()),
    148     write_header_(true),
    149     table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
    150 }
    151 
    152 LogTable::~LogTable() {
    153   for (RowList::iterator row_it = rows_history_->begin();
    154        row_it != rows_history_->end();) {
    155     delete *row_it;
    156     row_it = rows_history_->erase(row_it);
    157   }
    158   for (ColumnMap::iterator col_it = columns_.begin();
    159        col_it != columns_.end();) {
    160     // For maps all iterators (except the erased) are valid after an erase
    161     columns_.erase(col_it++);
    162   }
    163   if (file_ != NULL) {
    164     file_->Flush();
    165     file_->CloseFile();
    166     delete file_;
    167   }
    168   delete current_row_;
    169   delete table_lock_;
    170 }
    171 
    172 int LogTable::AddColumn(const std::string& column_name,
    173                         int multi_value_length) {
    174   assert(multi_value_length > 0);
    175   if (!write_header_) {
    176     // It's not allowed to add new columns after the header
    177     // has been written.
    178     assert(false);
    179     return -1;
    180   } else {
    181     CriticalSectionScoped synchronize(table_lock_);
    182     if (write_header_)
    183       columns_[column_name] = multi_value_length;
    184     else
    185       return -1;
    186   }
    187   return 0;
    188 }
    189 
    190 void LogTable::NextRow() {
    191   CriticalSectionScoped sync_rows(table_lock_);
    192   rows_history_->push_back(current_row_);
    193   current_row_ = new Row;
    194 }
    195 
    196 int LogTable::InsertCell(const std::string& column_name,
    197                          const Container* value_container) {
    198   CriticalSectionScoped synchronize(table_lock_);
    199   assert(columns_.count(column_name) > 0);
    200   if (columns_.count(column_name) == 0)
    201     return -1;
    202   return current_row_->InsertCell(column_name, value_container);
    203 }
    204 
    205 int LogTable::CreateLogFile(const std::string& file_name) {
    206   if (file_name.length() == 0)
    207     return -1;
    208   if (file_->Open())
    209     return -1;
    210   file_->OpenFile(file_name.c_str(),
    211                   false,  // Open with read/write permissions
    212                   false,  // Don't wraparound and write at the beginning when
    213                           // the file is full
    214                   true);  // Open as a text file
    215   if (file_ == NULL)
    216     return -1;
    217   return 0;
    218 }
    219 
    220 void LogTable::Flush() {
    221   ColumnMap::iterator column_it;
    222   bool commit_header = false;
    223   if (write_header_) {
    224     CriticalSectionScoped synchronize(table_lock_);
    225     if (write_header_) {
    226       commit_header = true;
    227       write_header_ = false;
    228     }
    229   }
    230   if (commit_header) {
    231     for (column_it = columns_.begin();
    232          column_it != columns_.end(); ++column_it) {
    233       if (column_it->second > 1) {
    234         file_->WriteText("%s[%u],", column_it->first.c_str(),
    235                          column_it->second);
    236         for (int i = 1; i < column_it->second; ++i)
    237           file_->WriteText(",");
    238       } else {
    239         file_->WriteText("%s,", column_it->first.c_str());
    240       }
    241     }
    242     if (columns_.size() > 0)
    243       file_->WriteText("\n");
    244   }
    245 
    246   // Swap the list used for flushing with the list containing the row history
    247   // and clear the history. We also create a local pointer to the new
    248   // list used for flushing to avoid race conditions if another thread
    249   // calls this function while we are writing.
    250   // We don't want to block the list while we're writing to file.
    251   {
    252     CriticalSectionScoped synchronize(table_lock_);
    253     RowList* tmp = rows_flush_;
    254     rows_flush_ = rows_history_;
    255     rows_history_ = tmp;
    256     rows_history_->clear();
    257   }
    258 
    259   // Write all complete rows to file and delete them
    260   for (RowList::iterator row_it = rows_flush_->begin();
    261        row_it != rows_flush_->end();) {
    262     for (column_it = columns_.begin();
    263          column_it != columns_.end(); ++column_it) {
    264       std::string row_string;
    265       (*row_it)->ToString(column_it->first, &row_string);
    266       file_->WriteText("%s", row_string.c_str());
    267     }
    268     if (columns_.size() > 0)
    269       file_->WriteText("\n");
    270     delete *row_it;
    271     row_it = rows_flush_->erase(row_it);
    272   }
    273 }
    274 
    275 int DataLog::CreateLog() {
    276   return DataLogImpl::CreateLog();
    277 }
    278 
    279 void DataLog::ReturnLog() {
    280   return DataLogImpl::ReturnLog();
    281 }
    282 
    283 std::string DataLog::Combine(const std::string& table_name, int table_id) {
    284   std::stringstream ss;
    285   std::string combined_id = table_name;
    286   std::string number_suffix;
    287   ss << "_" << table_id;
    288   ss >> number_suffix;
    289   combined_id += number_suffix;
    290   std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
    291                  ::tolower);
    292   return combined_id;
    293 }
    294 
    295 int DataLog::AddTable(const std::string& table_name) {
    296   DataLogImpl* data_log = DataLogImpl::StaticInstance();
    297   if (data_log == NULL)
    298     return -1;
    299   return data_log->AddTable(table_name);
    300 }
    301 
    302 int DataLog::AddColumn(const std::string& table_name,
    303                        const std::string& column_name,
    304                        int multi_value_length) {
    305   DataLogImpl* data_log = DataLogImpl::StaticInstance();
    306   if (data_log == NULL)
    307     return -1;
    308   return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
    309                                                             column_name,
    310                                                             multi_value_length);
    311 }
    312 
    313 int DataLog::NextRow(const std::string& table_name) {
    314   DataLogImpl* data_log = DataLogImpl::StaticInstance();
    315   if (data_log == NULL)
    316     return -1;
    317   return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
    318 }
    319 
    320 DataLogImpl::DataLogImpl()
    321     : counter_(1),
    322       tables_(),
    323       flush_event_(EventWrapper::Create()),
    324       file_writer_thread_(
    325           new rtc::PlatformThread(DataLogImpl::Run, instance_, "DataLog")),
    326       tables_lock_(RWLockWrapper::CreateRWLock()) {}
    327 
    328 DataLogImpl::~DataLogImpl() {
    329   StopThread();
    330   Flush();  // Write any remaining rows
    331   delete flush_event_;
    332   for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
    333     delete static_cast<LogTable*>(it->second);
    334     // For maps all iterators (except the erased) are valid after an erase
    335     tables_.erase(it++);
    336   }
    337   delete tables_lock_;
    338 }
    339 
    340 int DataLogImpl::CreateLog() {
    341   CriticalSectionScoped synchronize(crit_sect_.get());
    342   if (instance_ == NULL) {
    343     instance_ = new DataLogImpl();
    344     return instance_->Init();
    345   } else {
    346     ++instance_->counter_;
    347   }
    348   return 0;
    349 }
    350 
    351 int DataLogImpl::Init() {
    352   file_writer_thread_->Start();
    353   file_writer_thread_->SetPriority(rtc::kHighestPriority);
    354   return 0;
    355 }
    356 
    357 DataLogImpl* DataLogImpl::StaticInstance() {
    358   return instance_;
    359 }
    360 
    361 void DataLogImpl::ReturnLog() {
    362   CriticalSectionScoped synchronize(crit_sect_.get());
    363   if (instance_ && instance_->counter_ > 1) {
    364     --instance_->counter_;
    365     return;
    366   }
    367   delete instance_;
    368   instance_ = NULL;
    369 }
    370 
    371 int DataLogImpl::AddTable(const std::string& table_name) {
    372   WriteLockScoped synchronize(*tables_lock_);
    373   // Make sure we don't add a table which already exists
    374   if (tables_.count(table_name) > 0)
    375     return -1;
    376   tables_[table_name] = new LogTable();
    377   if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
    378     return -1;
    379   return 0;
    380 }
    381 
    382 int DataLogImpl::AddColumn(const std::string& table_name,
    383                            const std::string& column_name,
    384                            int multi_value_length) {
    385   ReadLockScoped synchronize(*tables_lock_);
    386   if (tables_.count(table_name) == 0)
    387     return -1;
    388   return tables_[table_name]->AddColumn(column_name, multi_value_length);
    389 }
    390 
    391 int DataLogImpl::InsertCell(const std::string& table_name,
    392                             const std::string& column_name,
    393                             const Container* value_container) {
    394   ReadLockScoped synchronize(*tables_lock_);
    395   assert(tables_.count(table_name) > 0);
    396   if (tables_.count(table_name) == 0)
    397     return -1;
    398   return tables_[table_name]->InsertCell(column_name, value_container);
    399 }
    400 
    401 int DataLogImpl::NextRow(const std::string& table_name) {
    402   ReadLockScoped synchronize(*tables_lock_);
    403   if (tables_.count(table_name) == 0)
    404     return -1;
    405   tables_[table_name]->NextRow();
    406   // Signal a complete row
    407   flush_event_->Set();
    408   return 0;
    409 }
    410 
    411 void DataLogImpl::Flush() {
    412   ReadLockScoped synchronize(*tables_lock_);
    413   for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
    414     it->second->Flush();
    415   }
    416 }
    417 
    418 bool DataLogImpl::Run(void* obj) {
    419   static_cast<DataLogImpl*>(obj)->Process();
    420   return true;
    421 }
    422 
    423 void DataLogImpl::Process() {
    424   // Wait for a row to be complete
    425   flush_event_->Wait(WEBRTC_EVENT_INFINITE);
    426   Flush();
    427 }
    428 
    429 void DataLogImpl::StopThread() {
    430   flush_event_->Set();
    431   file_writer_thread_->Stop();
    432 }
    433 
    434 }  // namespace webrtc
    435