1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "src/trace_processor/span_join_operator_table.h" 18 19 #include <sqlite3.h> 20 #include <string.h> 21 #include <algorithm> 22 #include <set> 23 #include <utility> 24 25 #include "perfetto/base/logging.h" 26 #include "perfetto/base/string_splitter.h" 27 #include "perfetto/base/string_utils.h" 28 #include "perfetto/base/string_view.h" 29 #include "src/trace_processor/sqlite_utils.h" 30 31 namespace perfetto { 32 namespace trace_processor { 33 34 namespace { 35 36 constexpr char kTsColumnName[] = "ts"; 37 constexpr char kDurColumnName[] = "dur"; 38 39 bool IsRequiredColumn(const std::string& name) { 40 return name == kTsColumnName || name == kDurColumnName; 41 } 42 43 bool HasDuplicateColumns(const std::vector<Table::Column>& cols) { 44 std::set<std::string> names; 45 for (const auto& col : cols) { 46 if (names.count(col.name()) > 0) { 47 PERFETTO_ELOG("Column '%s' present in the output schema more than once.", 48 col.name().c_str()); 49 return true; 50 } 51 names.insert(col.name()); 52 } 53 return false; 54 } 55 } // namespace 56 57 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const TraceStorage*) 58 : db_(db) {} 59 60 void SpanJoinOperatorTable::RegisterTable(sqlite3* db, 61 const TraceStorage* storage) { 62 Table::Register<SpanJoinOperatorTable>(db, storage, "span_join", 63 /* read_write */ false, 64 /* requires_args */ true); 65 66 Table::Register<SpanJoinOperatorTable>(db, storage, "span_left_join", 67 /* read_write */ false, 68 /* requires_args */ true); 69 70 Table::Register<SpanJoinOperatorTable>(db, storage, "span_outer_join", 71 /* read_write */ false, 72 /* requires_args */ true); 73 } 74 75 base::Optional<Table::Schema> SpanJoinOperatorTable::Init( 76 int argc, 77 const char* const* argv) { 78 // argv[0] - argv[2] are SQLite populated fields which are always present. 79 if (argc < 5) { 80 PERFETTO_ELOG("SPAN JOIN expected at least 2 args, received %d", argc - 3); 81 return base::nullopt; 82 } 83 84 auto maybe_t1_desc = TableDescriptor::Parse( 85 std::string(reinterpret_cast<const char*>(argv[3]))); 86 if (!maybe_t1_desc.has_value()) 87 return base::nullopt; 88 auto t1_desc = *maybe_t1_desc; 89 90 auto maybe_t2_desc = TableDescriptor::Parse( 91 std::string(reinterpret_cast<const char*>(argv[4]))); 92 if (!maybe_t2_desc.has_value()) 93 return base::nullopt; 94 auto t2_desc = *maybe_t2_desc; 95 96 if (t1_desc.partition_col == t2_desc.partition_col) { 97 partitioning_ = t1_desc.IsPartitioned() 98 ? PartitioningType::kSamePartitioning 99 : PartitioningType::kNoPartitioning; 100 if (partitioning_ == PartitioningType::kNoPartitioning && IsOuterJoin()) { 101 PERFETTO_ELOG("Outer join not supported for no partition tables"); 102 return base::nullopt; 103 } 104 } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) { 105 PERFETTO_ELOG("Mismatching partitions (%s, %s)", 106 t1_desc.partition_col.c_str(), t2_desc.partition_col.c_str()); 107 return base::nullopt; 108 } else { 109 if (IsOuterJoin()) { 110 PERFETTO_ELOG("Outer join not supported for mixed partitioned tables"); 111 return base::nullopt; 112 } 113 partitioning_ = PartitioningType::kMixedPartitioning; 114 } 115 116 auto maybe_t1_defn = CreateTableDefinition(t1_desc, IsOuterJoin()); 117 if (!maybe_t1_defn.has_value()) 118 return base::nullopt; 119 t1_defn_ = maybe_t1_defn.value(); 120 121 auto maybe_t2_defn = 122 CreateTableDefinition(t2_desc, IsOuterJoin() || IsLeftJoin()); 123 if (!maybe_t2_defn.has_value()) 124 return base::nullopt; 125 t2_defn_ = maybe_t2_defn.value(); 126 127 std::vector<Table::Column> cols; 128 // Ensure the shared columns are consistently ordered and are not 129 // present twice in the final schema 130 cols.emplace_back(Column::kTimestamp, kTsColumnName, ColumnType::kLong); 131 cols.emplace_back(Column::kDuration, kDurColumnName, ColumnType::kLong); 132 if (partitioning_ != PartitioningType::kNoPartitioning) 133 cols.emplace_back(Column::kPartition, partition_col(), ColumnType::kLong); 134 135 CreateSchemaColsForDefn(t1_defn_, &cols); 136 CreateSchemaColsForDefn(t2_defn_, &cols); 137 138 if (HasDuplicateColumns(cols)) { 139 return base::nullopt; 140 } 141 std::vector<size_t> primary_keys = {Column::kTimestamp}; 142 if (partitioning_ != PartitioningType::kNoPartitioning) 143 primary_keys.push_back(Column::kPartition); 144 return Schema(cols, primary_keys); 145 } 146 147 void SpanJoinOperatorTable::CreateSchemaColsForDefn( 148 const TableDefinition& defn, 149 std::vector<Table::Column>* cols) { 150 for (size_t i = 0; i < defn.columns().size(); i++) { 151 const auto& n = defn.columns()[i].name(); 152 if (IsRequiredColumn(n) || n == defn.partition_col()) 153 continue; 154 155 ColumnLocator* locator = &global_index_to_column_locator_[cols->size()]; 156 locator->defn = &defn; 157 locator->col_index = i; 158 159 cols->emplace_back(cols->size(), n, defn.columns()[i].type()); 160 } 161 } 162 163 std::unique_ptr<Table::Cursor> SpanJoinOperatorTable::CreateCursor() { 164 return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_)); 165 } 166 167 int SpanJoinOperatorTable::BestIndex(const QueryConstraints&, BestIndexInfo*) { 168 // TODO(lalitm): figure out cost estimation. 169 return SQLITE_OK; 170 } 171 172 std::vector<std::string> 173 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition( 174 const TableDefinition& defn, 175 const QueryConstraints& qc, 176 sqlite3_value** argv) { 177 std::vector<std::string> constraints; 178 for (size_t i = 0; i < qc.constraints().size(); i++) { 179 const auto& cs = qc.constraints()[i]; 180 auto col_name = GetNameForGlobalColumnIndex(defn, cs.iColumn); 181 if (col_name == "") 182 continue; 183 184 if (col_name == kTsColumnName || col_name == kDurColumnName) { 185 // We don't support constraints on ts or duration in the child tables. 186 PERFETTO_DFATAL("ts or duration constraints on child tables"); 187 continue; 188 } 189 auto op = sqlite_utils::OpToString(cs.op); 190 auto value = sqlite_utils::SqliteValueAsString(argv[i]); 191 192 constraints.emplace_back("`" + col_name + "`" + op + value); 193 } 194 return constraints; 195 } 196 197 base::Optional<SpanJoinOperatorTable::TableDefinition> 198 SpanJoinOperatorTable::CreateTableDefinition(const TableDescriptor& desc, 199 bool emit_shadow_slices) { 200 auto cols = sqlite_utils::GetColumnsForTable(db_, desc.name); 201 202 uint32_t required_columns_found = 0; 203 uint32_t ts_idx = std::numeric_limits<uint32_t>::max(); 204 uint32_t dur_idx = std::numeric_limits<uint32_t>::max(); 205 uint32_t partition_idx = std::numeric_limits<uint32_t>::max(); 206 for (uint32_t i = 0; i < cols.size(); i++) { 207 auto col = cols[i]; 208 if (IsRequiredColumn(col.name())) { 209 ++required_columns_found; 210 if (col.type() != Table::ColumnType::kLong && 211 col.type() != Table::ColumnType::kUnknown) { 212 PERFETTO_ELOG("Invalid column type for %s", col.name().c_str()); 213 return base::nullopt; 214 } 215 } 216 217 if (col.name() == kTsColumnName) { 218 ts_idx = i; 219 } else if (col.name() == kDurColumnName) { 220 dur_idx = i; 221 } else if (col.name() == desc.partition_col) { 222 partition_idx = i; 223 } 224 } 225 if (required_columns_found != 2) { 226 PERFETTO_ELOG("Required columns not found (found %d)", 227 required_columns_found); 228 return base::nullopt; 229 } 230 231 PERFETTO_DCHECK(ts_idx < cols.size()); 232 PERFETTO_DCHECK(dur_idx < cols.size()); 233 PERFETTO_DCHECK(desc.partition_col.empty() || partition_idx < cols.size()); 234 235 return TableDefinition(desc.name, desc.partition_col, std::move(cols), 236 emit_shadow_slices, ts_idx, dur_idx, partition_idx); 237 } 238 239 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex( 240 const TableDefinition& defn, 241 int global_column) { 242 size_t col_idx = static_cast<size_t>(global_column); 243 if (col_idx == Column::kTimestamp) 244 return kTsColumnName; 245 else if (col_idx == Column::kDuration) 246 return kDurColumnName; 247 else if (col_idx == Column::kPartition && 248 partitioning_ != PartitioningType::kNoPartitioning) 249 return defn.partition_col().c_str(); 250 251 const auto& locator = global_index_to_column_locator_[col_idx]; 252 if (locator.defn != &defn) 253 return ""; 254 return defn.columns()[locator.col_index].name().c_str(); 255 } 256 257 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db) 258 : Table::Cursor(table), 259 t1_(table, &table->t1_defn_, db), 260 t2_(table, &table->t2_defn_, db), 261 table_(table) {} 262 263 int SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc, 264 sqlite3_value** argv) { 265 int err = t1_.Initialize(qc, argv); 266 if (err != SQLITE_OK) 267 return err; 268 269 err = t2_.Initialize(qc, argv); 270 if (err != SQLITE_OK) 271 return err; 272 273 // Step the partitioned table to allow for us to look into it below. 274 Query* step_now = t1_.IsPartitioned() ? &t1_ : &t2_; 275 next_stepped_ = step_now == &t1_ ? &t2_ : &t1_; 276 277 auto res = step_now->Step(); 278 if (PERFETTO_UNLIKELY(res.is_err())) 279 return res.err_code; 280 281 // Forward the unpartitioned table to reflect the partition of the partitoined 282 // table. 283 if (table_->partitioning_ == PartitioningType::kMixedPartitioning) { 284 PERFETTO_DCHECK(step_now->IsPartitioned()); 285 286 // If we emit shadow slices, we need to step because the first slice will 287 // be a full partition shadow slice that we need to skip. 288 if (step_now->definition()->emit_shadow_slices()) { 289 PERFETTO_DCHECK(step_now->IsFullPartitionShadowSlice()); 290 res = step_now->StepToNextPartition(); 291 if (PERFETTO_UNLIKELY(res.is_err())) 292 return res.err_code; 293 } 294 295 res = next_stepped_->StepToPartition(step_now->partition()); 296 if (PERFETTO_UNLIKELY(res.is_err())) 297 return res.err_code; 298 } 299 300 // Otherwise, find an overlapping span. 301 return Next(); 302 } 303 304 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() { 305 if (!t1_.IsRealSlice() && !t2_.IsRealSlice()) { 306 return false; 307 } else if (t1_.partition() != t2_.partition()) { 308 return false; 309 } else if (t1_.ts_end() <= t2_.ts_start() || t2_.ts_end() <= t1_.ts_start()) { 310 return false; 311 } 312 return true; 313 } 314 315 int SpanJoinOperatorTable::Cursor::Next() { 316 // TODO: Propagate error msg to the table. 317 auto res = next_stepped_->Step(); 318 if (res.is_err()) 319 return res.err_code; 320 321 while (true) { 322 if (t1_.Eof() || t2_.Eof()) { 323 if (table_->partitioning_ != PartitioningType::kMixedPartitioning) 324 return SQLITE_OK; 325 326 auto* partitioned = t1_.IsPartitioned() ? &t1_ : &t2_; 327 auto* unpartitioned = t1_.IsPartitioned() ? &t2_ : &t1_; 328 if (partitioned->Eof()) 329 return SQLITE_OK; 330 331 res = partitioned->StepToNextPartition(); 332 if (PERFETTO_UNLIKELY(res.is_err())) 333 return res.err_code; 334 else if (PERFETTO_UNLIKELY(res.is_eof())) 335 continue; 336 337 res = unpartitioned->StepToPartition(partitioned->partition()); 338 if (PERFETTO_UNLIKELY(res.is_err())) 339 return res.err_code; 340 else if (PERFETTO_UNLIKELY(res.is_eof())) 341 continue; 342 } 343 344 int64_t partition = std::max(t1_.partition(), t2_.partition()); 345 res = t1_.StepToPartition(partition); 346 if (PERFETTO_UNLIKELY(res.is_err())) 347 return res.err_code; 348 else if (PERFETTO_UNLIKELY(res.is_eof())) 349 continue; 350 351 res = t2_.StepToPartition(t1_.partition()); 352 if (PERFETTO_UNLIKELY(res.is_err())) 353 return res.err_code; 354 else if (PERFETTO_UNLIKELY(res.is_eof())) 355 continue; 356 357 if (t1_.partition() != t2_.partition()) 358 continue; 359 360 auto ts = std::max(t1_.ts_start(), t2_.ts_start()); 361 res = t1_.StepUntil(ts); 362 if (PERFETTO_UNLIKELY(res.is_err())) 363 return res.err_code; 364 else if (PERFETTO_UNLIKELY(res.is_eof())) 365 continue; 366 367 res = t2_.StepUntil(t1_.ts_start()); 368 if (PERFETTO_UNLIKELY(res.is_err())) 369 return res.err_code; 370 else if (PERFETTO_UNLIKELY(res.is_eof())) 371 continue; 372 373 // If we're in the case where we have shadow slices on both tables, try 374 // and forward the earliest table and see what happens. IsOverlappingSpan() 375 // will double check that we have at least one non-real slice now. 376 // Note: if we don't do this, we end up in an infinite loop because all 377 // the code above will not change anything because these shadow slices will 378 // be overlapping. 379 if (!t1_.IsRealSlice() && !t2_.IsRealSlice()) { 380 PERFETTO_DCHECK(t1_.partition() == t2_.partition()); 381 382 // If the table is not partitioned, partition() will return the partition 383 // the table was set to have by StepToPartition(). 384 auto t1_partition = 385 t1_.IsPartitioned() ? t1_.CursorPartition() : t1_.partition(); 386 auto t2_partition = 387 t2_.IsPartitioned() ? t2_.CursorPartition() : t2_.partition(); 388 389 Query* stepped; 390 if (t1_partition == t2_partition) { 391 stepped = t1_.ts_end() <= t2_.ts_end() ? &t1_ : &t2_; 392 } else { 393 stepped = t1_partition <= t2_partition ? &t1_ : &t2_; 394 } 395 396 res = stepped->Step(); 397 if (PERFETTO_UNLIKELY(res.is_err())) 398 return res.err_code; 399 else if (PERFETTO_UNLIKELY(res.is_eof())) 400 continue; 401 } 402 403 if (IsOverlappingSpan()) 404 break; 405 } 406 next_stepped_ = t1_.ts_end() <= t2_.ts_end() ? &t1_ : &t2_; 407 408 return SQLITE_OK; 409 } 410 411 int SpanJoinOperatorTable::Cursor::Eof() { 412 return t1_.Eof() || t2_.Eof(); 413 } 414 415 int SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context, int N) { 416 PERFETTO_DCHECK(!t1_.Eof()); 417 PERFETTO_DCHECK(!t2_.Eof()); 418 419 if (N == Column::kTimestamp) { 420 auto max_ts = std::max(t1_.ts_start(), t2_.ts_start()); 421 sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts)); 422 } else if (N == Column::kDuration) { 423 auto max_start = std::max(t1_.ts_start(), t2_.ts_start()); 424 auto min_end = std::min(t1_.ts_end(), t2_.ts_end()); 425 PERFETTO_DCHECK(min_end > max_start); 426 auto dur = min_end - max_start; 427 sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur)); 428 } else if (N == Column::kPartition && 429 table_->partitioning_ != PartitioningType::kNoPartitioning) { 430 PERFETTO_DCHECK(table_->partitioning_ == 431 PartitioningType::kMixedPartitioning || 432 t1_.partition() == t2_.partition()); 433 auto partition = t1_.IsPartitioned() ? t1_.partition() : t2_.partition(); 434 sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition)); 435 } else { 436 size_t index = static_cast<size_t>(N); 437 const auto& locator = table_->global_index_to_column_locator_[index]; 438 if (locator.defn == t1_.definition()) 439 t1_.ReportSqliteResult(context, locator.col_index); 440 else 441 t2_.ReportSqliteResult(context, locator.col_index); 442 } 443 return SQLITE_OK; 444 } 445 446 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table, 447 const TableDefinition* definition, 448 sqlite3* db) 449 : defn_(definition), db_(db), table_(table) { 450 PERFETTO_DCHECK(!defn_->IsPartitioned() || 451 defn_->partition_idx() < defn_->columns().size()); 452 } 453 454 SpanJoinOperatorTable::Query::~Query() = default; 455 456 int SpanJoinOperatorTable::Query::Initialize(const QueryConstraints& qc, 457 sqlite3_value** argv) { 458 *this = Query(table_, definition(), db_); 459 sql_query_ = CreateSqlQuery( 460 table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv)); 461 return PrepareRawStmt(); 462 } 463 464 SpanJoinOperatorTable::Query::StepRet SpanJoinOperatorTable::Query::Step() { 465 PERFETTO_DCHECK(!Eof()); 466 sqlite3_stmt* stmt = stmt_.get(); 467 468 // In this loop, we will try and find the slice to from the cursor. 469 // Terminology: "Shadow slices" are slices which fill in the gaps between real 470 // slices from the underlying cursor in each partition (if any). 471 // For queries which don't need "shadow slices", we simply return non-zero 472 // duration slices from the underlying cursor. 473 do { 474 if (mode_ == Mode::kShadowSlice) { 475 PERFETTO_DCHECK(defn_->emit_shadow_slices()); 476 477 // If we're out of slices in the cursor, this shadow slice will be the 478 // final slice. 479 if (cursor_eof_) { 480 mode_ = Mode::kRealSlice; 481 return StepRet(StepRet::Code::kEof); 482 } 483 484 // Look ahead to see if the cursor changes partition (if the cursor is 485 // partitioned). If so, then we need to fill the gap between the ts == 0 486 // and the start of that slice. Otherwise after this slice, we have the 487 // real slice from the cursor. 488 if (!defn_->IsPartitioned() || partition_ == CursorPartition()) { 489 mode_ = Mode::kRealSlice; 490 ts_start_ = CursorTs(); 491 ts_end_ = ts_start_ + CursorDur(); 492 } else if (IsFullPartitionShadowSlice()) { 493 mode_ = Mode::kShadowSlice; 494 ts_start_ = 0; 495 ts_end_ = CursorTs(); 496 partition_ = CursorPartition(); 497 } else { 498 mode_ = Mode::kShadowSlice; 499 ts_start_ = 0; 500 ts_end_ = std::numeric_limits<int64_t>::max(); 501 } 502 continue; 503 } 504 505 int res; 506 if (defn_->IsPartitioned()) { 507 auto partition_idx = static_cast<int>(defn_->partition_idx()); 508 // Fastforward through any rows with null partition keys. 509 int row_type; 510 do { 511 res = sqlite3_step(stmt); 512 row_type = sqlite3_column_type(stmt, partition_idx); 513 } while (res == SQLITE_ROW && row_type == SQLITE_NULL); 514 } else { 515 res = sqlite3_step(stmt); 516 } 517 518 if (res == SQLITE_ROW) { 519 // After every row, there will be a shadow slice so emit that if we need 520 // to do so. Otherwise, just emit the underlying slice. 521 if (defn_->emit_shadow_slices()) { 522 mode_ = Mode::kShadowSlice; 523 ts_start_ = ts_end_; 524 ts_end_ = !defn_->IsPartitioned() || partition_ == CursorPartition() 525 ? CursorTs() 526 : std::numeric_limits<int64_t>::max(); 527 } else { 528 mode_ = Mode::kRealSlice; 529 ts_start_ = CursorTs(); 530 ts_end_ = ts_start_ + CursorDur(); 531 if (defn_->IsPartitioned()) 532 partition_ = CursorPartition(); 533 } 534 } else if (res == SQLITE_DONE) { 535 cursor_eof_ = true; 536 if (!defn_->emit_shadow_slices()) 537 return StepRet(StepRet::Code::kEof); 538 539 // Close off the remainder of this partition with a shadow slice. 540 mode_ = Mode::kShadowSlice; 541 ts_start_ = ts_end_; 542 ts_end_ = std::numeric_limits<int64_t>::max(); 543 } else { 544 return StepRet(StepRet::Code::kError, res); 545 } 546 } while (ts_start_ == ts_end_); 547 548 return StepRet(StepRet::Code::kRow); 549 } 550 551 SpanJoinOperatorTable::Query::StepRet 552 SpanJoinOperatorTable::Query::StepToNextPartition() { 553 PERFETTO_DCHECK(defn_->IsPartitioned()); 554 PERFETTO_DCHECK(!Eof()); 555 556 auto current_partition = partition_; 557 while (partition_ <= current_partition) { 558 auto res = Step(); 559 if (!res.is_row()) 560 return res; 561 } 562 return StepRet(StepRet::Code::kRow); 563 } 564 565 SpanJoinOperatorTable::Query::StepRet 566 SpanJoinOperatorTable::Query::StepToPartition(int64_t target_partition) { 567 PERFETTO_DCHECK(partition_ <= target_partition); 568 if (defn_->IsPartitioned()) { 569 while (partition_ < target_partition) { 570 if (IsFullPartitionShadowSlice() && 571 target_partition < CursorPartition()) { 572 partition_ = target_partition; 573 return StepRet(StepRet::Code::kRow); 574 } 575 576 auto res = Step(); 577 if (!res.is_row()) 578 return res; 579 } 580 } else if (/* !defn_->IsPartitioned() && */ partition_ < target_partition) { 581 int res = PrepareRawStmt(); 582 if (res != SQLITE_OK) 583 return StepRet(StepRet::Code::kError, res); 584 partition_ = target_partition; 585 } 586 return StepRet(StepRet::Code::kRow); 587 } 588 589 SpanJoinOperatorTable::Query::StepRet SpanJoinOperatorTable::Query::StepUntil( 590 int64_t timestamp) { 591 PERFETTO_DCHECK(!Eof()); 592 auto partition = partition_; 593 while (partition_ == partition && ts_end_ <= timestamp) { 594 auto res = Step(); 595 if (!res.is_row()) 596 return res; 597 } 598 return StepRet(StepRet::Code::kRow); 599 } 600 601 std::string SpanJoinOperatorTable::Query::CreateSqlQuery( 602 const std::vector<std::string>& cs) const { 603 std::vector<std::string> col_names; 604 for (const Table::Column& c : defn_->columns()) { 605 col_names.push_back("`" + c.name() + "`"); 606 } 607 608 std::string sql = "SELECT " + base::Join(col_names, ", "); 609 sql += " FROM " + defn_->name(); 610 if (!cs.empty()) { 611 sql += " WHERE " + base::Join(cs, " AND "); 612 } 613 sql += " ORDER BY "; 614 sql += defn_->IsPartitioned() 615 ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ") 616 : "ts"; 617 sql += ";"; 618 PERFETTO_DLOG("%s", sql.c_str()); 619 return sql; 620 } 621 622 int SpanJoinOperatorTable::Query::PrepareRawStmt() { 623 sqlite3_stmt* stmt = nullptr; 624 int err = 625 sqlite3_prepare_v2(db_, sql_query_.c_str(), 626 static_cast<int>(sql_query_.size()), &stmt, nullptr); 627 stmt_.reset(stmt); 628 629 ts_start_ = 0; 630 ts_end_ = 0; 631 partition_ = std::numeric_limits<int64_t>::lowest(); 632 cursor_eof_ = false; 633 mode_ = Mode::kRealSlice; 634 635 return err; 636 } 637 638 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context, 639 size_t index) { 640 if (mode_ != Mode::kRealSlice) { 641 sqlite3_result_null(context); 642 return; 643 } 644 645 sqlite3_stmt* stmt = stmt_.get(); 646 int idx = static_cast<int>(index); 647 switch (sqlite3_column_type(stmt, idx)) { 648 case SQLITE_INTEGER: 649 sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx)); 650 break; 651 case SQLITE_FLOAT: 652 sqlite3_result_double(context, sqlite3_column_double(stmt, idx)); 653 break; 654 case SQLITE_TEXT: { 655 // TODO(lalitm): note for future optimizations: if we knew the addresses 656 // of the string intern pool, we could check if the string returned here 657 // comes from the pool, and pass it as non-transient. 658 const auto kSqliteTransient = 659 reinterpret_cast<sqlite3_destructor_type>(-1); 660 auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx)); 661 sqlite3_result_text(context, ptr, -1, kSqliteTransient); 662 break; 663 } 664 } 665 } 666 667 SpanJoinOperatorTable::TableDefinition::TableDefinition( 668 std::string name, 669 std::string partition_col, 670 std::vector<Table::Column> cols, 671 bool emit_shadow_slices, 672 uint32_t ts_idx, 673 uint32_t dur_idx, 674 uint32_t partition_idx) 675 : name_(std::move(name)), 676 partition_col_(std::move(partition_col)), 677 cols_(std::move(cols)), 678 emit_shadow_slices_(emit_shadow_slices), 679 ts_idx_(ts_idx), 680 dur_idx_(dur_idx), 681 partition_idx_(partition_idx) {} 682 683 base::Optional<SpanJoinOperatorTable::TableDescriptor> 684 SpanJoinOperatorTable::TableDescriptor::Parse( 685 const std::string& raw_descriptor) { 686 // Descriptors have one of the following forms: 687 // table_name [PARTITIONED column_name] 688 689 // Find the table name. 690 base::StringSplitter splitter(raw_descriptor, ' '); 691 if (!splitter.Next()) 692 return base::nullopt; 693 694 TableDescriptor descriptor; 695 descriptor.name = splitter.cur_token(); 696 if (!splitter.Next()) 697 return std::move(descriptor); 698 699 if (strcasecmp(splitter.cur_token(), "PARTITIONED") != 0) { 700 PERFETTO_ELOG("Invalid SPAN_JOIN token %s", splitter.cur_token()); 701 return base::nullopt; 702 } 703 if (!splitter.Next()) { 704 PERFETTO_ELOG("Missing partitioning column"); 705 return base::nullopt; 706 } 707 708 descriptor.partition_col = splitter.cur_token(); 709 return std::move(descriptor); 710 } 711 712 } // namespace trace_processor 713 } // namespace perfetto 714