1 /* 2 * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "data_log.h" 12 13 #include <assert.h> 14 15 #include <algorithm> 16 #include <list> 17 18 #include "critical_section_wrapper.h" 19 #include "event_wrapper.h" 20 #include "file_wrapper.h" 21 #include "rw_lock_wrapper.h" 22 #include "thread_wrapper.h" 23 24 namespace webrtc { 25 26 DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_( 27 CriticalSectionWrapper::CreateCriticalSection()); 28 29 DataLogImpl* DataLogImpl::instance_ = NULL; 30 31 // A Row contains cells, which are indexed by the column names as std::string. 32 // The string index is treated in a case sensitive way. 33 class Row { 34 public: 35 Row(); 36 ~Row(); 37 38 // Inserts a Container into the cell of the column specified with 39 // column_name. 40 // column_name is treated in a case sensitive way. 41 int InsertCell(const std::string& column_name, 42 const Container* value_container); 43 44 // Converts the value at the column specified by column_name to a string 45 // stored in value_string. 46 // column_name is treated in a case sensitive way. 47 void ToString(const std::string& column_name, std::string* value_string); 48 49 private: 50 // Collection of containers indexed by column name as std::string 51 typedef std::map<std::string, const Container*> CellMap; 52 53 CellMap cells_; 54 CriticalSectionWrapper* cells_lock_; 55 }; 56 57 // A LogTable contains multiple rows, where only the latest row is active for 58 // editing. The rows are defined by the ColumnMap, which contains the name of 59 // each column and the length of the column (1 for one-value-columns and greater 60 // than 1 for multi-value-columns). 61 class LogTable { 62 public: 63 LogTable(); 64 ~LogTable(); 65 66 // Adds the column with name column_name to the table. The column will be a 67 // multi-value-column if multi_value_length is greater than 1. 68 // column_name is treated in a case sensitive way. 69 int AddColumn(const std::string& column_name, int multi_value_length); 70 71 // Buffers the current row while it is waiting to be written to file, 72 // which is done by a call to Flush(). A new row is available when the 73 // function returns 74 void NextRow(); 75 76 // Inserts a Container into the cell of the column specified with 77 // column_name. 78 // column_name is treated in a case sensitive way. 79 int InsertCell(const std::string& column_name, 80 const Container* value_container); 81 82 // Creates a log file, named as specified in the string file_name, to 83 // where the table will be written when calling Flush(). 84 int CreateLogFile(const std::string& file_name); 85 86 // Write all complete rows to file. 87 // May not be called by two threads simultaneously (doing so may result in 88 // a race condition). Will be called by the file_writer_thread_ when that 89 // thread is running. 90 void Flush(); 91 92 private: 93 // Collection of multi_value_lengths indexed by column name as std::string 94 typedef std::map<std::string, int> ColumnMap; 95 typedef std::list<Row*> RowList; 96 97 ColumnMap columns_; 98 RowList rows_[2]; 99 RowList* rows_history_; 100 RowList* rows_flush_; 101 Row* current_row_; 102 FileWrapper* file_; 103 bool write_header_; 104 CriticalSectionWrapper* table_lock_; 105 }; 106 107 Row::Row() 108 : cells_(), 109 cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) { 110 } 111 112 Row::~Row() { 113 for (CellMap::iterator it = cells_.begin(); it != cells_.end();) { 114 delete it->second; 115 // For maps all iterators (except the erased) are valid after an erase 116 cells_.erase(it++); 117 } 118 delete cells_lock_; 119 } 120 121 int Row::InsertCell(const std::string& column_name, 122 const Container* value_container) { 123 CriticalSectionScoped synchronize(cells_lock_); 124 assert(cells_.count(column_name) == 0); 125 if (cells_.count(column_name) > 0) 126 return -1; 127 cells_[column_name] = value_container; 128 return 0; 129 } 130 131 void Row::ToString(const std::string& column_name, 132 std::string* value_string) { 133 CriticalSectionScoped synchronize(cells_lock_); 134 const Container* container = cells_[column_name]; 135 if (container == NULL) { 136 *value_string = "NaN,"; 137 return; 138 } 139 container->ToString(value_string); 140 } 141 142 LogTable::LogTable() 143 : columns_(), 144 rows_(), 145 rows_history_(&rows_[0]), 146 rows_flush_(&rows_[1]), 147 current_row_(new Row), 148 file_(FileWrapper::Create()), 149 write_header_(true), 150 table_lock_(CriticalSectionWrapper::CreateCriticalSection()) { 151 } 152 153 LogTable::~LogTable() { 154 for (RowList::iterator row_it = rows_history_->begin(); 155 row_it != rows_history_->end();) { 156 delete *row_it; 157 row_it = rows_history_->erase(row_it); 158 } 159 for (ColumnMap::iterator col_it = columns_.begin(); 160 col_it != columns_.end();) { 161 // For maps all iterators (except the erased) are valid after an erase 162 columns_.erase(col_it++); 163 } 164 if (file_ != NULL) { 165 file_->Flush(); 166 file_->CloseFile(); 167 delete file_; 168 } 169 delete current_row_; 170 delete table_lock_; 171 } 172 173 int LogTable::AddColumn(const std::string& column_name, 174 int multi_value_length) { 175 assert(multi_value_length > 0); 176 if (!write_header_) { 177 // It's not allowed to add new columns after the header 178 // has been written. 179 assert(false); 180 return -1; 181 } else { 182 CriticalSectionScoped synchronize(table_lock_); 183 if (write_header_) 184 columns_[column_name] = multi_value_length; 185 else 186 return -1; 187 } 188 return 0; 189 } 190 191 void LogTable::NextRow() { 192 CriticalSectionScoped sync_rows(table_lock_); 193 rows_history_->push_back(current_row_); 194 current_row_ = new Row; 195 } 196 197 int LogTable::InsertCell(const std::string& column_name, 198 const Container* value_container) { 199 CriticalSectionScoped synchronize(table_lock_); 200 assert(columns_.count(column_name) > 0); 201 if (columns_.count(column_name) == 0) 202 return -1; 203 return current_row_->InsertCell(column_name, value_container); 204 } 205 206 int LogTable::CreateLogFile(const std::string& file_name) { 207 if (file_name.length() == 0) 208 return -1; 209 if (file_->Open()) 210 return -1; 211 file_->OpenFile(file_name.c_str(), 212 false, // Open with read/write permissions 213 false, // Don't wraparound and write at the beginning when 214 // the file is full 215 true); // Open as a text file 216 if (file_ == NULL) 217 return -1; 218 return 0; 219 } 220 221 void LogTable::Flush() { 222 ColumnMap::iterator column_it; 223 bool commit_header = false; 224 if (write_header_) { 225 CriticalSectionScoped synchronize(table_lock_); 226 if (write_header_) { 227 commit_header = true; 228 write_header_ = false; 229 } 230 } 231 if (commit_header) { 232 for (column_it = columns_.begin(); 233 column_it != columns_.end(); ++column_it) { 234 if (column_it->second > 1) { 235 file_->WriteText("%s[%u],", column_it->first.c_str(), 236 column_it->second); 237 for (int i = 1; i < column_it->second; ++i) 238 file_->WriteText(","); 239 } else { 240 file_->WriteText("%s,", column_it->first.c_str()); 241 } 242 } 243 if (columns_.size() > 0) 244 file_->WriteText("\n"); 245 } 246 247 // Swap the list used for flushing with the list containing the row history 248 // and clear the history. We also create a local pointer to the new 249 // list used for flushing to avoid race conditions if another thread 250 // calls this function while we are writing. 251 // We don't want to block the list while we're writing to file. 252 { 253 CriticalSectionScoped synchronize(table_lock_); 254 RowList* tmp = rows_flush_; 255 rows_flush_ = rows_history_; 256 rows_history_ = tmp; 257 rows_history_->clear(); 258 } 259 260 // Write all complete rows to file and delete them 261 for (RowList::iterator row_it = rows_flush_->begin(); 262 row_it != rows_flush_->end();) { 263 for (column_it = columns_.begin(); 264 column_it != columns_.end(); ++column_it) { 265 std::string row_string; 266 (*row_it)->ToString(column_it->first, &row_string); 267 file_->WriteText("%s", row_string.c_str()); 268 } 269 if (columns_.size() > 0) 270 file_->WriteText("\n"); 271 delete *row_it; 272 row_it = rows_flush_->erase(row_it); 273 } 274 } 275 276 int DataLog::CreateLog() { 277 return DataLogImpl::CreateLog(); 278 } 279 280 void DataLog::ReturnLog() { 281 return DataLogImpl::ReturnLog(); 282 } 283 284 std::string DataLog::Combine(const std::string& table_name, int table_id) { 285 std::stringstream ss; 286 std::string combined_id = table_name; 287 std::string number_suffix; 288 ss << "_" << table_id; 289 ss >> number_suffix; 290 combined_id += number_suffix; 291 std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(), 292 ::tolower); 293 return combined_id; 294 } 295 296 int DataLog::AddTable(const std::string& table_name) { 297 DataLogImpl* data_log = DataLogImpl::StaticInstance(); 298 if (data_log == NULL) 299 return -1; 300 return data_log->AddTable(table_name); 301 } 302 303 int DataLog::AddColumn(const std::string& table_name, 304 const std::string& column_name, 305 int multi_value_length) { 306 DataLogImpl* data_log = DataLogImpl::StaticInstance(); 307 if (data_log == NULL) 308 return -1; 309 return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name, 310 column_name, 311 multi_value_length); 312 } 313 314 int DataLog::NextRow(const std::string& table_name) { 315 DataLogImpl* data_log = DataLogImpl::StaticInstance(); 316 if (data_log == NULL) 317 return -1; 318 return data_log->DataLogImpl::StaticInstance()->NextRow(table_name); 319 } 320 321 DataLogImpl::DataLogImpl() 322 : counter_(1), 323 tables_(), 324 flush_event_(EventWrapper::Create()), 325 file_writer_thread_(NULL), 326 tables_lock_(RWLockWrapper::CreateRWLock()) { 327 } 328 329 DataLogImpl::~DataLogImpl() { 330 StopThread(); 331 Flush(); // Write any remaining rows 332 delete file_writer_thread_; 333 delete flush_event_; 334 for (TableMap::iterator it = tables_.begin(); it != tables_.end();) { 335 delete static_cast<LogTable*>(it->second); 336 // For maps all iterators (except the erased) are valid after an erase 337 tables_.erase(it++); 338 } 339 delete tables_lock_; 340 } 341 342 int DataLogImpl::CreateLog() { 343 CriticalSectionScoped synchronize(crit_sect_.get()); 344 if (instance_ == NULL) { 345 instance_ = new DataLogImpl(); 346 return instance_->Init(); 347 } else { 348 ++instance_->counter_; 349 } 350 return 0; 351 } 352 353 int DataLogImpl::Init() { 354 file_writer_thread_ = ThreadWrapper::CreateThread( 355 DataLogImpl::Run, 356 instance_, 357 kHighestPriority, 358 "DataLog"); 359 if (file_writer_thread_ == NULL) 360 return -1; 361 unsigned int thread_id = 0; 362 bool success = file_writer_thread_->Start(thread_id); 363 if (!success) 364 return -1; 365 return 0; 366 } 367 368 DataLogImpl* DataLogImpl::StaticInstance() { 369 return instance_; 370 } 371 372 void DataLogImpl::ReturnLog() { 373 CriticalSectionScoped synchronize(crit_sect_.get()); 374 if (instance_ && instance_->counter_ > 1) { 375 --instance_->counter_; 376 return; 377 } 378 delete instance_; 379 instance_ = NULL; 380 } 381 382 int DataLogImpl::AddTable(const std::string& table_name) { 383 WriteLockScoped synchronize(*tables_lock_); 384 // Make sure we don't add a table which already exists 385 if (tables_.count(table_name) > 0) 386 return -1; 387 tables_[table_name] = new LogTable(); 388 if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1) 389 return -1; 390 return 0; 391 } 392 393 int DataLogImpl::AddColumn(const std::string& table_name, 394 const std::string& column_name, 395 int multi_value_length) { 396 ReadLockScoped synchronize(*tables_lock_); 397 if (tables_.count(table_name) == 0) 398 return -1; 399 return tables_[table_name]->AddColumn(column_name, multi_value_length); 400 } 401 402 int DataLogImpl::InsertCell(const std::string& table_name, 403 const std::string& column_name, 404 const Container* value_container) { 405 ReadLockScoped synchronize(*tables_lock_); 406 assert(tables_.count(table_name) > 0); 407 if (tables_.count(table_name) == 0) 408 return -1; 409 return tables_[table_name]->InsertCell(column_name, value_container); 410 } 411 412 int DataLogImpl::NextRow(const std::string& table_name) { 413 ReadLockScoped synchronize(*tables_lock_); 414 if (tables_.count(table_name) == 0) 415 return -1; 416 tables_[table_name]->NextRow(); 417 if (file_writer_thread_ == NULL) { 418 // Write every row to file as they get complete. 419 tables_[table_name]->Flush(); 420 } else { 421 // Signal a complete row 422 flush_event_->Set(); 423 } 424 return 0; 425 } 426 427 void DataLogImpl::Flush() { 428 ReadLockScoped synchronize(*tables_lock_); 429 for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) { 430 it->second->Flush(); 431 } 432 } 433 434 bool DataLogImpl::Run(void* obj) { 435 static_cast<DataLogImpl*>(obj)->Process(); 436 return true; 437 } 438 439 void DataLogImpl::Process() { 440 // Wait for a row to be complete 441 flush_event_->Wait(WEBRTC_EVENT_INFINITE); 442 Flush(); 443 } 444 445 void DataLogImpl::StopThread() { 446 if (file_writer_thread_ != NULL) { 447 file_writer_thread_->SetNotAlive(); 448 flush_event_->Set(); 449 // Call Stop() repeatedly, waiting for the Flush() call in Process() to 450 // finish. 451 while (!file_writer_thread_->Stop()) continue; 452 } 453 } 454 455 } // namespace webrtc 456