Home | History | Annotate | Download | only in source
      1 /*
      2  *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "data_log.h"
     12 
     13 #include <assert.h>
     14 
     15 #include <algorithm>
     16 #include <list>
     17 
     18 #include "critical_section_wrapper.h"
     19 #include "event_wrapper.h"
     20 #include "file_wrapper.h"
     21 #include "rw_lock_wrapper.h"
     22 #include "thread_wrapper.h"
     23 
     24 namespace webrtc {
     25 
     26 DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
     27   CriticalSectionWrapper::CreateCriticalSection());
     28 
     29 DataLogImpl* DataLogImpl::instance_ = NULL;
     30 
     31 // A Row contains cells, which are indexed by the column names as std::string.
     32 // The string index is treated in a case sensitive way.
     33 class Row {
     34  public:
     35   Row();
     36   ~Row();
     37 
     38   // Inserts a Container into the cell of the column specified with
     39   // column_name.
     40   // column_name is treated in a case sensitive way.
     41   int InsertCell(const std::string& column_name,
     42                  const Container* value_container);
     43 
     44   // Converts the value at the column specified by column_name to a string
     45   // stored in value_string.
     46   // column_name is treated in a case sensitive way.
     47   void ToString(const std::string& column_name, std::string* value_string);
     48 
     49  private:
     50   // Collection of containers indexed by column name as std::string
     51   typedef std::map<std::string, const Container*> CellMap;
     52 
     53   CellMap                   cells_;
     54   CriticalSectionWrapper*   cells_lock_;
     55 };
     56 
     57 // A LogTable contains multiple rows, where only the latest row is active for
     58 // editing. The rows are defined by the ColumnMap, which contains the name of
     59 // each column and the length of the column (1 for one-value-columns and greater
     60 // than 1 for multi-value-columns).
     61 class LogTable {
     62  public:
     63   LogTable();
     64   ~LogTable();
     65 
     66   // Adds the column with name column_name to the table. The column will be a
     67   // multi-value-column if multi_value_length is greater than 1.
     68   // column_name is treated in a case sensitive way.
     69   int AddColumn(const std::string& column_name, int multi_value_length);
     70 
     71   // Buffers the current row while it is waiting to be written to file,
     72   // which is done by a call to Flush(). A new row is available when the
     73   // function returns
     74   void NextRow();
     75 
     76   // Inserts a Container into the cell of the column specified with
     77   // column_name.
     78   // column_name is treated in a case sensitive way.
     79   int InsertCell(const std::string& column_name,
     80                  const Container* value_container);
     81 
     82   // Creates a log file, named as specified in the string file_name, to
     83   // where the table will be written when calling Flush().
     84   int CreateLogFile(const std::string& file_name);
     85 
     86   // Write all complete rows to file.
     87   // May not be called by two threads simultaneously (doing so may result in
     88   // a race condition). Will be called by the file_writer_thread_ when that
     89   // thread is running.
     90   void Flush();
     91 
     92  private:
     93   // Collection of multi_value_lengths indexed by column name as std::string
     94   typedef std::map<std::string, int> ColumnMap;
     95   typedef std::list<Row*> RowList;
     96 
     97   ColumnMap               columns_;
     98   RowList                 rows_[2];
     99   RowList*                rows_history_;
    100   RowList*                rows_flush_;
    101   Row*                    current_row_;
    102   FileWrapper*            file_;
    103   bool                    write_header_;
    104   CriticalSectionWrapper* table_lock_;
    105 };
    106 
    107 Row::Row()
    108   : cells_(),
    109     cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
    110 }
    111 
    112 Row::~Row() {
    113   for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
    114     delete it->second;
    115     // For maps all iterators (except the erased) are valid after an erase
    116     cells_.erase(it++);
    117   }
    118   delete cells_lock_;
    119 }
    120 
    121 int Row::InsertCell(const std::string& column_name,
    122                     const Container* value_container) {
    123   CriticalSectionScoped synchronize(cells_lock_);
    124   assert(cells_.count(column_name) == 0);
    125   if (cells_.count(column_name) > 0)
    126     return -1;
    127   cells_[column_name] = value_container;
    128   return 0;
    129 }
    130 
    131 void Row::ToString(const std::string& column_name,
    132                    std::string* value_string) {
    133   CriticalSectionScoped synchronize(cells_lock_);
    134   const Container* container = cells_[column_name];
    135   if (container == NULL) {
    136     *value_string = "NaN,";
    137     return;
    138   }
    139   container->ToString(value_string);
    140 }
    141 
    142 LogTable::LogTable()
    143   : columns_(),
    144     rows_(),
    145     rows_history_(&rows_[0]),
    146     rows_flush_(&rows_[1]),
    147     current_row_(new Row),
    148     file_(FileWrapper::Create()),
    149     write_header_(true),
    150     table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
    151 }
    152 
    153 LogTable::~LogTable() {
    154   for (RowList::iterator row_it = rows_history_->begin();
    155        row_it != rows_history_->end();) {
    156     delete *row_it;
    157     row_it = rows_history_->erase(row_it);
    158   }
    159   for (ColumnMap::iterator col_it = columns_.begin();
    160        col_it != columns_.end();) {
    161     // For maps all iterators (except the erased) are valid after an erase
    162     columns_.erase(col_it++);
    163   }
    164   if (file_ != NULL) {
    165     file_->Flush();
    166     file_->CloseFile();
    167     delete file_;
    168   }
    169   delete current_row_;
    170   delete table_lock_;
    171 }
    172 
    173 int LogTable::AddColumn(const std::string& column_name,
    174                         int multi_value_length) {
    175   assert(multi_value_length > 0);
    176   if (!write_header_) {
    177     // It's not allowed to add new columns after the header
    178     // has been written.
    179     assert(false);
    180     return -1;
    181   } else {
    182     CriticalSectionScoped synchronize(table_lock_);
    183     if (write_header_)
    184       columns_[column_name] = multi_value_length;
    185     else
    186       return -1;
    187   }
    188   return 0;
    189 }
    190 
    191 void LogTable::NextRow() {
    192   CriticalSectionScoped sync_rows(table_lock_);
    193   rows_history_->push_back(current_row_);
    194   current_row_ = new Row;
    195 }
    196 
    197 int LogTable::InsertCell(const std::string& column_name,
    198                          const Container* value_container) {
    199   CriticalSectionScoped synchronize(table_lock_);
    200   assert(columns_.count(column_name) > 0);
    201   if (columns_.count(column_name) == 0)
    202     return -1;
    203   return current_row_->InsertCell(column_name, value_container);
    204 }
    205 
    206 int LogTable::CreateLogFile(const std::string& file_name) {
    207   if (file_name.length() == 0)
    208     return -1;
    209   if (file_->Open())
    210     return -1;
    211   file_->OpenFile(file_name.c_str(),
    212                   false,  // Open with read/write permissions
    213                   false,  // Don't wraparound and write at the beginning when
    214                           // the file is full
    215                   true);  // Open as a text file
    216   if (file_ == NULL)
    217     return -1;
    218   return 0;
    219 }
    220 
    221 void LogTable::Flush() {
    222   ColumnMap::iterator column_it;
    223   bool commit_header = false;
    224   if (write_header_) {
    225     CriticalSectionScoped synchronize(table_lock_);
    226     if (write_header_) {
    227       commit_header = true;
    228       write_header_ = false;
    229     }
    230   }
    231   if (commit_header) {
    232     for (column_it = columns_.begin();
    233          column_it != columns_.end(); ++column_it) {
    234       if (column_it->second > 1) {
    235         file_->WriteText("%s[%u],", column_it->first.c_str(),
    236                          column_it->second);
    237         for (int i = 1; i < column_it->second; ++i)
    238           file_->WriteText(",");
    239       } else {
    240         file_->WriteText("%s,", column_it->first.c_str());
    241       }
    242     }
    243     if (columns_.size() > 0)
    244       file_->WriteText("\n");
    245   }
    246 
    247   // Swap the list used for flushing with the list containing the row history
    248   // and clear the history. We also create a local pointer to the new
    249   // list used for flushing to avoid race conditions if another thread
    250   // calls this function while we are writing.
    251   // We don't want to block the list while we're writing to file.
    252   {
    253     CriticalSectionScoped synchronize(table_lock_);
    254     RowList* tmp = rows_flush_;
    255     rows_flush_ = rows_history_;
    256     rows_history_ = tmp;
    257     rows_history_->clear();
    258   }
    259 
    260   // Write all complete rows to file and delete them
    261   for (RowList::iterator row_it = rows_flush_->begin();
    262        row_it != rows_flush_->end();) {
    263     for (column_it = columns_.begin();
    264          column_it != columns_.end(); ++column_it) {
    265       std::string row_string;
    266       (*row_it)->ToString(column_it->first, &row_string);
    267       file_->WriteText("%s", row_string.c_str());
    268     }
    269     if (columns_.size() > 0)
    270       file_->WriteText("\n");
    271     delete *row_it;
    272     row_it = rows_flush_->erase(row_it);
    273   }
    274 }
    275 
    276 int DataLog::CreateLog() {
    277   return DataLogImpl::CreateLog();
    278 }
    279 
    280 void DataLog::ReturnLog() {
    281   return DataLogImpl::ReturnLog();
    282 }
    283 
    284 std::string DataLog::Combine(const std::string& table_name, int table_id) {
    285   std::stringstream ss;
    286   std::string combined_id = table_name;
    287   std::string number_suffix;
    288   ss << "_" << table_id;
    289   ss >> number_suffix;
    290   combined_id += number_suffix;
    291   std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
    292                  ::tolower);
    293   return combined_id;
    294 }
    295 
    296 int DataLog::AddTable(const std::string& table_name) {
    297   DataLogImpl* data_log = DataLogImpl::StaticInstance();
    298   if (data_log == NULL)
    299     return -1;
    300   return data_log->AddTable(table_name);
    301 }
    302 
    303 int DataLog::AddColumn(const std::string& table_name,
    304                        const std::string& column_name,
    305                        int multi_value_length) {
    306   DataLogImpl* data_log = DataLogImpl::StaticInstance();
    307   if (data_log == NULL)
    308     return -1;
    309   return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
    310                                                             column_name,
    311                                                             multi_value_length);
    312 }
    313 
    314 int DataLog::NextRow(const std::string& table_name) {
    315   DataLogImpl* data_log = DataLogImpl::StaticInstance();
    316   if (data_log == NULL)
    317     return -1;
    318   return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
    319 }
    320 
    321 DataLogImpl::DataLogImpl()
    322   : counter_(1),
    323     tables_(),
    324     flush_event_(EventWrapper::Create()),
    325     file_writer_thread_(NULL),
    326     tables_lock_(RWLockWrapper::CreateRWLock()) {
    327 }
    328 
    329 DataLogImpl::~DataLogImpl() {
    330   StopThread();
    331   Flush();  // Write any remaining rows
    332   delete file_writer_thread_;
    333   delete flush_event_;
    334   for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
    335     delete static_cast<LogTable*>(it->second);
    336     // For maps all iterators (except the erased) are valid after an erase
    337     tables_.erase(it++);
    338   }
    339   delete tables_lock_;
    340 }
    341 
    342 int DataLogImpl::CreateLog() {
    343   CriticalSectionScoped synchronize(crit_sect_.get());
    344   if (instance_ == NULL) {
    345     instance_ = new DataLogImpl();
    346     return instance_->Init();
    347   } else {
    348     ++instance_->counter_;
    349   }
    350   return 0;
    351 }
    352 
    353 int DataLogImpl::Init() {
    354   file_writer_thread_ = ThreadWrapper::CreateThread(
    355                           DataLogImpl::Run,
    356                           instance_,
    357                           kHighestPriority,
    358                           "DataLog");
    359   if (file_writer_thread_ == NULL)
    360     return -1;
    361   unsigned int thread_id = 0;
    362   bool success = file_writer_thread_->Start(thread_id);
    363   if (!success)
    364     return -1;
    365   return 0;
    366 }
    367 
    368 DataLogImpl* DataLogImpl::StaticInstance() {
    369   return instance_;
    370 }
    371 
    372 void DataLogImpl::ReturnLog() {
    373   CriticalSectionScoped synchronize(crit_sect_.get());
    374   if (instance_ && instance_->counter_ > 1) {
    375     --instance_->counter_;
    376     return;
    377   }
    378   delete instance_;
    379   instance_ = NULL;
    380 }
    381 
    382 int DataLogImpl::AddTable(const std::string& table_name) {
    383   WriteLockScoped synchronize(*tables_lock_);
    384   // Make sure we don't add a table which already exists
    385   if (tables_.count(table_name) > 0)
    386     return -1;
    387   tables_[table_name] = new LogTable();
    388   if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
    389     return -1;
    390   return 0;
    391 }
    392 
    393 int DataLogImpl::AddColumn(const std::string& table_name,
    394                            const std::string& column_name,
    395                            int multi_value_length) {
    396   ReadLockScoped synchronize(*tables_lock_);
    397   if (tables_.count(table_name) == 0)
    398     return -1;
    399   return tables_[table_name]->AddColumn(column_name, multi_value_length);
    400 }
    401 
    402 int DataLogImpl::InsertCell(const std::string& table_name,
    403                             const std::string& column_name,
    404                             const Container* value_container) {
    405   ReadLockScoped synchronize(*tables_lock_);
    406   assert(tables_.count(table_name) > 0);
    407   if (tables_.count(table_name) == 0)
    408     return -1;
    409   return tables_[table_name]->InsertCell(column_name, value_container);
    410 }
    411 
    412 int DataLogImpl::NextRow(const std::string& table_name) {
    413   ReadLockScoped synchronize(*tables_lock_);
    414   if (tables_.count(table_name) == 0)
    415     return -1;
    416   tables_[table_name]->NextRow();
    417   if (file_writer_thread_ == NULL) {
    418     // Write every row to file as they get complete.
    419     tables_[table_name]->Flush();
    420   } else {
    421     // Signal a complete row
    422     flush_event_->Set();
    423   }
    424   return 0;
    425 }
    426 
    427 void DataLogImpl::Flush() {
    428   ReadLockScoped synchronize(*tables_lock_);
    429   for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
    430     it->second->Flush();
    431   }
    432 }
    433 
    434 bool DataLogImpl::Run(void* obj) {
    435   static_cast<DataLogImpl*>(obj)->Process();
    436   return true;
    437 }
    438 
    439 void DataLogImpl::Process() {
    440   // Wait for a row to be complete
    441   flush_event_->Wait(WEBRTC_EVENT_INFINITE);
    442   Flush();
    443 }
    444 
    445 void DataLogImpl::StopThread() {
    446   if (file_writer_thread_ != NULL) {
    447     file_writer_thread_->SetNotAlive();
    448     flush_event_->Set();
    449     // Call Stop() repeatedly, waiting for the Flush() call in Process() to
    450     // finish.
    451     while (!file_writer_thread_->Stop()) continue;
    452   }
    453 }
    454 
    455 }  // namespace webrtc
    456