Home | History | Annotate | Download | only in trace_processor
      1 /*
      2  * Copyright (C) 2018 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "src/trace_processor/span_join_operator_table.h"
     18 
     19 #include <sqlite3.h>
     20 #include <string.h>
     21 #include <algorithm>
     22 #include <set>
     23 #include <utility>
     24 
     25 #include "perfetto/base/logging.h"
     26 #include "perfetto/base/string_splitter.h"
     27 #include "perfetto/base/string_utils.h"
     28 #include "perfetto/base/string_view.h"
     29 #include "src/trace_processor/sqlite_utils.h"
     30 
     31 namespace perfetto {
     32 namespace trace_processor {
     33 
     34 namespace {
     35 
     36 constexpr char kTsColumnName[] = "ts";
     37 constexpr char kDurColumnName[] = "dur";
     38 
     39 bool IsRequiredColumn(const std::string& name) {
     40   return name == kTsColumnName || name == kDurColumnName;
     41 }
     42 
     43 bool HasDuplicateColumns(const std::vector<Table::Column>& cols) {
     44   std::set<std::string> names;
     45   for (const auto& col : cols) {
     46     if (names.count(col.name()) > 0) {
     47       PERFETTO_ELOG("Column '%s' present in the output schema more than once.",
     48                     col.name().c_str());
     49       return true;
     50     }
     51     names.insert(col.name());
     52   }
     53   return false;
     54 }
     55 }  // namespace
     56 
     57 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const TraceStorage*)
     58     : db_(db) {}
     59 
     60 void SpanJoinOperatorTable::RegisterTable(sqlite3* db,
     61                                           const TraceStorage* storage) {
     62   Table::Register<SpanJoinOperatorTable>(db, storage, "span_join",
     63                                          /* read_write */ false,
     64                                          /* requires_args */ true);
     65 
     66   Table::Register<SpanJoinOperatorTable>(db, storage, "span_left_join",
     67                                          /* read_write */ false,
     68                                          /* requires_args */ true);
     69 
     70   Table::Register<SpanJoinOperatorTable>(db, storage, "span_outer_join",
     71                                          /* read_write */ false,
     72                                          /* requires_args */ true);
     73 }
     74 
     75 base::Optional<Table::Schema> SpanJoinOperatorTable::Init(
     76     int argc,
     77     const char* const* argv) {
     78   // argv[0] - argv[2] are SQLite populated fields which are always present.
     79   if (argc < 5) {
     80     PERFETTO_ELOG("SPAN JOIN expected at least 2 args, received %d", argc - 3);
     81     return base::nullopt;
     82   }
     83 
     84   auto maybe_t1_desc = TableDescriptor::Parse(
     85       std::string(reinterpret_cast<const char*>(argv[3])));
     86   if (!maybe_t1_desc.has_value())
     87     return base::nullopt;
     88   auto t1_desc = *maybe_t1_desc;
     89 
     90   auto maybe_t2_desc = TableDescriptor::Parse(
     91       std::string(reinterpret_cast<const char*>(argv[4])));
     92   if (!maybe_t2_desc.has_value())
     93     return base::nullopt;
     94   auto t2_desc = *maybe_t2_desc;
     95 
     96   if (t1_desc.partition_col == t2_desc.partition_col) {
     97     partitioning_ = t1_desc.IsPartitioned()
     98                         ? PartitioningType::kSamePartitioning
     99                         : PartitioningType::kNoPartitioning;
    100     if (partitioning_ == PartitioningType::kNoPartitioning && IsOuterJoin()) {
    101       PERFETTO_ELOG("Outer join not supported for no partition tables");
    102       return base::nullopt;
    103     }
    104   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
    105     PERFETTO_ELOG("Mismatching partitions (%s, %s)",
    106                   t1_desc.partition_col.c_str(), t2_desc.partition_col.c_str());
    107     return base::nullopt;
    108   } else {
    109     if (IsOuterJoin()) {
    110       PERFETTO_ELOG("Outer join not supported for mixed partitioned tables");
    111       return base::nullopt;
    112     }
    113     partitioning_ = PartitioningType::kMixedPartitioning;
    114   }
    115 
    116   auto maybe_t1_defn = CreateTableDefinition(t1_desc, IsOuterJoin());
    117   if (!maybe_t1_defn.has_value())
    118     return base::nullopt;
    119   t1_defn_ = maybe_t1_defn.value();
    120 
    121   auto maybe_t2_defn =
    122       CreateTableDefinition(t2_desc, IsOuterJoin() || IsLeftJoin());
    123   if (!maybe_t2_defn.has_value())
    124     return base::nullopt;
    125   t2_defn_ = maybe_t2_defn.value();
    126 
    127   std::vector<Table::Column> cols;
    128   // Ensure the shared columns are consistently ordered and are not
    129   // present twice in the final schema
    130   cols.emplace_back(Column::kTimestamp, kTsColumnName, ColumnType::kLong);
    131   cols.emplace_back(Column::kDuration, kDurColumnName, ColumnType::kLong);
    132   if (partitioning_ != PartitioningType::kNoPartitioning)
    133     cols.emplace_back(Column::kPartition, partition_col(), ColumnType::kLong);
    134 
    135   CreateSchemaColsForDefn(t1_defn_, &cols);
    136   CreateSchemaColsForDefn(t2_defn_, &cols);
    137 
    138   if (HasDuplicateColumns(cols)) {
    139     return base::nullopt;
    140   }
    141   std::vector<size_t> primary_keys = {Column::kTimestamp};
    142   if (partitioning_ != PartitioningType::kNoPartitioning)
    143     primary_keys.push_back(Column::kPartition);
    144   return Schema(cols, primary_keys);
    145 }
    146 
    147 void SpanJoinOperatorTable::CreateSchemaColsForDefn(
    148     const TableDefinition& defn,
    149     std::vector<Table::Column>* cols) {
    150   for (size_t i = 0; i < defn.columns().size(); i++) {
    151     const auto& n = defn.columns()[i].name();
    152     if (IsRequiredColumn(n) || n == defn.partition_col())
    153       continue;
    154 
    155     ColumnLocator* locator = &global_index_to_column_locator_[cols->size()];
    156     locator->defn = &defn;
    157     locator->col_index = i;
    158 
    159     cols->emplace_back(cols->size(), n, defn.columns()[i].type());
    160   }
    161 }
    162 
    163 std::unique_ptr<Table::Cursor> SpanJoinOperatorTable::CreateCursor() {
    164   return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_));
    165 }
    166 
    167 int SpanJoinOperatorTable::BestIndex(const QueryConstraints&, BestIndexInfo*) {
    168   // TODO(lalitm): figure out cost estimation.
    169   return SQLITE_OK;
    170 }
    171 
    172 std::vector<std::string>
    173 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition(
    174     const TableDefinition& defn,
    175     const QueryConstraints& qc,
    176     sqlite3_value** argv) {
    177   std::vector<std::string> constraints;
    178   for (size_t i = 0; i < qc.constraints().size(); i++) {
    179     const auto& cs = qc.constraints()[i];
    180     auto col_name = GetNameForGlobalColumnIndex(defn, cs.iColumn);
    181     if (col_name == "")
    182       continue;
    183 
    184     if (col_name == kTsColumnName || col_name == kDurColumnName) {
    185       // We don't support constraints on ts or duration in the child tables.
    186       PERFETTO_DFATAL("ts or duration constraints on child tables");
    187       continue;
    188     }
    189     auto op = sqlite_utils::OpToString(cs.op);
    190     auto value = sqlite_utils::SqliteValueAsString(argv[i]);
    191 
    192     constraints.emplace_back("`" + col_name + "`" + op + value);
    193   }
    194   return constraints;
    195 }
    196 
    197 base::Optional<SpanJoinOperatorTable::TableDefinition>
    198 SpanJoinOperatorTable::CreateTableDefinition(const TableDescriptor& desc,
    199                                              bool emit_shadow_slices) {
    200   auto cols = sqlite_utils::GetColumnsForTable(db_, desc.name);
    201 
    202   uint32_t required_columns_found = 0;
    203   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
    204   uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
    205   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
    206   for (uint32_t i = 0; i < cols.size(); i++) {
    207     auto col = cols[i];
    208     if (IsRequiredColumn(col.name())) {
    209       ++required_columns_found;
    210       if (col.type() != Table::ColumnType::kLong &&
    211           col.type() != Table::ColumnType::kUnknown) {
    212         PERFETTO_ELOG("Invalid column type for %s", col.name().c_str());
    213         return base::nullopt;
    214       }
    215     }
    216 
    217     if (col.name() == kTsColumnName) {
    218       ts_idx = i;
    219     } else if (col.name() == kDurColumnName) {
    220       dur_idx = i;
    221     } else if (col.name() == desc.partition_col) {
    222       partition_idx = i;
    223     }
    224   }
    225   if (required_columns_found != 2) {
    226     PERFETTO_ELOG("Required columns not found (found %d)",
    227                   required_columns_found);
    228     return base::nullopt;
    229   }
    230 
    231   PERFETTO_DCHECK(ts_idx < cols.size());
    232   PERFETTO_DCHECK(dur_idx < cols.size());
    233   PERFETTO_DCHECK(desc.partition_col.empty() || partition_idx < cols.size());
    234 
    235   return TableDefinition(desc.name, desc.partition_col, std::move(cols),
    236                          emit_shadow_slices, ts_idx, dur_idx, partition_idx);
    237 }
    238 
    239 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex(
    240     const TableDefinition& defn,
    241     int global_column) {
    242   size_t col_idx = static_cast<size_t>(global_column);
    243   if (col_idx == Column::kTimestamp)
    244     return kTsColumnName;
    245   else if (col_idx == Column::kDuration)
    246     return kDurColumnName;
    247   else if (col_idx == Column::kPartition &&
    248            partitioning_ != PartitioningType::kNoPartitioning)
    249     return defn.partition_col().c_str();
    250 
    251   const auto& locator = global_index_to_column_locator_[col_idx];
    252   if (locator.defn != &defn)
    253     return "";
    254   return defn.columns()[locator.col_index].name().c_str();
    255 }
    256 
    257 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db)
    258     : Table::Cursor(table),
    259       t1_(table, &table->t1_defn_, db),
    260       t2_(table, &table->t2_defn_, db),
    261       table_(table) {}
    262 
    263 int SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc,
    264                                           sqlite3_value** argv) {
    265   int err = t1_.Initialize(qc, argv);
    266   if (err != SQLITE_OK)
    267     return err;
    268 
    269   err = t2_.Initialize(qc, argv);
    270   if (err != SQLITE_OK)
    271     return err;
    272 
    273   // Step the partitioned table to allow for us to look into it below.
    274   Query* step_now = t1_.IsPartitioned() ? &t1_ : &t2_;
    275   next_stepped_ = step_now == &t1_ ? &t2_ : &t1_;
    276 
    277   auto res = step_now->Step();
    278   if (PERFETTO_UNLIKELY(res.is_err()))
    279     return res.err_code;
    280 
    281   // Forward the unpartitioned table to reflect the partition of the partitoined
    282   // table.
    283   if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
    284     PERFETTO_DCHECK(step_now->IsPartitioned());
    285 
    286     // If we emit shadow slices, we need to step because the first slice will
    287     // be a full partition shadow slice that we need to skip.
    288     if (step_now->definition()->emit_shadow_slices()) {
    289       PERFETTO_DCHECK(step_now->IsFullPartitionShadowSlice());
    290       res = step_now->StepToNextPartition();
    291       if (PERFETTO_UNLIKELY(res.is_err()))
    292         return res.err_code;
    293     }
    294 
    295     res = next_stepped_->StepToPartition(step_now->partition());
    296     if (PERFETTO_UNLIKELY(res.is_err()))
    297       return res.err_code;
    298   }
    299 
    300   // Otherwise, find an overlapping span.
    301   return Next();
    302 }
    303 
    304 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() {
    305   if (!t1_.IsRealSlice() && !t2_.IsRealSlice()) {
    306     return false;
    307   } else if (t1_.partition() != t2_.partition()) {
    308     return false;
    309   } else if (t1_.ts_end() <= t2_.ts_start() || t2_.ts_end() <= t1_.ts_start()) {
    310     return false;
    311   }
    312   return true;
    313 }
    314 
    315 int SpanJoinOperatorTable::Cursor::Next() {
    316   // TODO: Propagate error msg to the table.
    317   auto res = next_stepped_->Step();
    318   if (res.is_err())
    319     return res.err_code;
    320 
    321   while (true) {
    322     if (t1_.Eof() || t2_.Eof()) {
    323       if (table_->partitioning_ != PartitioningType::kMixedPartitioning)
    324         return SQLITE_OK;
    325 
    326       auto* partitioned = t1_.IsPartitioned() ? &t1_ : &t2_;
    327       auto* unpartitioned = t1_.IsPartitioned() ? &t2_ : &t1_;
    328       if (partitioned->Eof())
    329         return SQLITE_OK;
    330 
    331       res = partitioned->StepToNextPartition();
    332       if (PERFETTO_UNLIKELY(res.is_err()))
    333         return res.err_code;
    334       else if (PERFETTO_UNLIKELY(res.is_eof()))
    335         continue;
    336 
    337       res = unpartitioned->StepToPartition(partitioned->partition());
    338       if (PERFETTO_UNLIKELY(res.is_err()))
    339         return res.err_code;
    340       else if (PERFETTO_UNLIKELY(res.is_eof()))
    341         continue;
    342     }
    343 
    344     int64_t partition = std::max(t1_.partition(), t2_.partition());
    345     res = t1_.StepToPartition(partition);
    346     if (PERFETTO_UNLIKELY(res.is_err()))
    347       return res.err_code;
    348     else if (PERFETTO_UNLIKELY(res.is_eof()))
    349       continue;
    350 
    351     res = t2_.StepToPartition(t1_.partition());
    352     if (PERFETTO_UNLIKELY(res.is_err()))
    353       return res.err_code;
    354     else if (PERFETTO_UNLIKELY(res.is_eof()))
    355       continue;
    356 
    357     if (t1_.partition() != t2_.partition())
    358       continue;
    359 
    360     auto ts = std::max(t1_.ts_start(), t2_.ts_start());
    361     res = t1_.StepUntil(ts);
    362     if (PERFETTO_UNLIKELY(res.is_err()))
    363       return res.err_code;
    364     else if (PERFETTO_UNLIKELY(res.is_eof()))
    365       continue;
    366 
    367     res = t2_.StepUntil(t1_.ts_start());
    368     if (PERFETTO_UNLIKELY(res.is_err()))
    369       return res.err_code;
    370     else if (PERFETTO_UNLIKELY(res.is_eof()))
    371       continue;
    372 
    373     // If we're in the case where we have shadow slices on both tables, try
    374     // and forward the earliest table and see what happens. IsOverlappingSpan()
    375     // will double check that we have at least one non-real slice now.
    376     // Note: if we don't do this, we end up in an infinite loop because all
    377     // the code above will not change anything because these shadow slices will
    378     // be overlapping.
    379     if (!t1_.IsRealSlice() && !t2_.IsRealSlice()) {
    380       PERFETTO_DCHECK(t1_.partition() == t2_.partition());
    381 
    382       // If the table is not partitioned, partition() will return the partition
    383       // the table was set to have by StepToPartition().
    384       auto t1_partition =
    385           t1_.IsPartitioned() ? t1_.CursorPartition() : t1_.partition();
    386       auto t2_partition =
    387           t2_.IsPartitioned() ? t2_.CursorPartition() : t2_.partition();
    388 
    389       Query* stepped;
    390       if (t1_partition == t2_partition) {
    391         stepped = t1_.ts_end() <= t2_.ts_end() ? &t1_ : &t2_;
    392       } else {
    393         stepped = t1_partition <= t2_partition ? &t1_ : &t2_;
    394       }
    395 
    396       res = stepped->Step();
    397       if (PERFETTO_UNLIKELY(res.is_err()))
    398         return res.err_code;
    399       else if (PERFETTO_UNLIKELY(res.is_eof()))
    400         continue;
    401     }
    402 
    403     if (IsOverlappingSpan())
    404       break;
    405   }
    406   next_stepped_ = t1_.ts_end() <= t2_.ts_end() ? &t1_ : &t2_;
    407 
    408   return SQLITE_OK;
    409 }
    410 
    411 int SpanJoinOperatorTable::Cursor::Eof() {
    412   return t1_.Eof() || t2_.Eof();
    413 }
    414 
    415 int SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
    416   PERFETTO_DCHECK(!t1_.Eof());
    417   PERFETTO_DCHECK(!t2_.Eof());
    418 
    419   if (N == Column::kTimestamp) {
    420     auto max_ts = std::max(t1_.ts_start(), t2_.ts_start());
    421     sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts));
    422   } else if (N == Column::kDuration) {
    423     auto max_start = std::max(t1_.ts_start(), t2_.ts_start());
    424     auto min_end = std::min(t1_.ts_end(), t2_.ts_end());
    425     PERFETTO_DCHECK(min_end > max_start);
    426     auto dur = min_end - max_start;
    427     sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur));
    428   } else if (N == Column::kPartition &&
    429              table_->partitioning_ != PartitioningType::kNoPartitioning) {
    430     PERFETTO_DCHECK(table_->partitioning_ ==
    431                         PartitioningType::kMixedPartitioning ||
    432                     t1_.partition() == t2_.partition());
    433     auto partition = t1_.IsPartitioned() ? t1_.partition() : t2_.partition();
    434     sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition));
    435   } else {
    436     size_t index = static_cast<size_t>(N);
    437     const auto& locator = table_->global_index_to_column_locator_[index];
    438     if (locator.defn == t1_.definition())
    439       t1_.ReportSqliteResult(context, locator.col_index);
    440     else
    441       t2_.ReportSqliteResult(context, locator.col_index);
    442   }
    443   return SQLITE_OK;
    444 }
    445 
    446 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table,
    447                                     const TableDefinition* definition,
    448                                     sqlite3* db)
    449     : defn_(definition), db_(db), table_(table) {
    450   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
    451                   defn_->partition_idx() < defn_->columns().size());
    452 }
    453 
    454 SpanJoinOperatorTable::Query::~Query() = default;
    455 
    456 int SpanJoinOperatorTable::Query::Initialize(const QueryConstraints& qc,
    457                                              sqlite3_value** argv) {
    458   *this = Query(table_, definition(), db_);
    459   sql_query_ = CreateSqlQuery(
    460       table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv));
    461   return PrepareRawStmt();
    462 }
    463 
    464 SpanJoinOperatorTable::Query::StepRet SpanJoinOperatorTable::Query::Step() {
    465   PERFETTO_DCHECK(!Eof());
    466   sqlite3_stmt* stmt = stmt_.get();
    467 
    468   // In this loop, we will try and find the slice to from the cursor.
    469   // Terminology: "Shadow slices" are slices which fill in the gaps between real
    470   // slices from the underlying cursor in each partition (if any).
    471   // For queries which don't need "shadow slices", we simply return non-zero
    472   // duration slices from the underlying cursor.
    473   do {
    474     if (mode_ == Mode::kShadowSlice) {
    475       PERFETTO_DCHECK(defn_->emit_shadow_slices());
    476 
    477       // If we're out of slices in the cursor, this shadow slice will be the
    478       // final slice.
    479       if (cursor_eof_) {
    480         mode_ = Mode::kRealSlice;
    481         return StepRet(StepRet::Code::kEof);
    482       }
    483 
    484       // Look ahead to see if the cursor changes partition (if the cursor is
    485       // partitioned). If so, then we need to fill the gap between the ts == 0
    486       // and the start of that slice. Otherwise after this slice, we have the
    487       // real slice from the cursor.
    488       if (!defn_->IsPartitioned() || partition_ == CursorPartition()) {
    489         mode_ = Mode::kRealSlice;
    490         ts_start_ = CursorTs();
    491         ts_end_ = ts_start_ + CursorDur();
    492       } else if (IsFullPartitionShadowSlice()) {
    493         mode_ = Mode::kShadowSlice;
    494         ts_start_ = 0;
    495         ts_end_ = CursorTs();
    496         partition_ = CursorPartition();
    497       } else {
    498         mode_ = Mode::kShadowSlice;
    499         ts_start_ = 0;
    500         ts_end_ = std::numeric_limits<int64_t>::max();
    501       }
    502       continue;
    503     }
    504 
    505     int res;
    506     if (defn_->IsPartitioned()) {
    507       auto partition_idx = static_cast<int>(defn_->partition_idx());
    508       // Fastforward through any rows with null partition keys.
    509       int row_type;
    510       do {
    511         res = sqlite3_step(stmt);
    512         row_type = sqlite3_column_type(stmt, partition_idx);
    513       } while (res == SQLITE_ROW && row_type == SQLITE_NULL);
    514     } else {
    515       res = sqlite3_step(stmt);
    516     }
    517 
    518     if (res == SQLITE_ROW) {
    519       // After every row, there will be a shadow slice so emit that if we need
    520       // to do so. Otherwise, just emit the underlying slice.
    521       if (defn_->emit_shadow_slices()) {
    522         mode_ = Mode::kShadowSlice;
    523         ts_start_ = ts_end_;
    524         ts_end_ = !defn_->IsPartitioned() || partition_ == CursorPartition()
    525                       ? CursorTs()
    526                       : std::numeric_limits<int64_t>::max();
    527       } else {
    528         mode_ = Mode::kRealSlice;
    529         ts_start_ = CursorTs();
    530         ts_end_ = ts_start_ + CursorDur();
    531         if (defn_->IsPartitioned())
    532           partition_ = CursorPartition();
    533       }
    534     } else if (res == SQLITE_DONE) {
    535       cursor_eof_ = true;
    536       if (!defn_->emit_shadow_slices())
    537         return StepRet(StepRet::Code::kEof);
    538 
    539       // Close off the remainder of this partition with a shadow slice.
    540       mode_ = Mode::kShadowSlice;
    541       ts_start_ = ts_end_;
    542       ts_end_ = std::numeric_limits<int64_t>::max();
    543     } else {
    544       return StepRet(StepRet::Code::kError, res);
    545     }
    546   } while (ts_start_ == ts_end_);
    547 
    548   return StepRet(StepRet::Code::kRow);
    549 }
    550 
    551 SpanJoinOperatorTable::Query::StepRet
    552 SpanJoinOperatorTable::Query::StepToNextPartition() {
    553   PERFETTO_DCHECK(defn_->IsPartitioned());
    554   PERFETTO_DCHECK(!Eof());
    555 
    556   auto current_partition = partition_;
    557   while (partition_ <= current_partition) {
    558     auto res = Step();
    559     if (!res.is_row())
    560       return res;
    561   }
    562   return StepRet(StepRet::Code::kRow);
    563 }
    564 
    565 SpanJoinOperatorTable::Query::StepRet
    566 SpanJoinOperatorTable::Query::StepToPartition(int64_t target_partition) {
    567   PERFETTO_DCHECK(partition_ <= target_partition);
    568   if (defn_->IsPartitioned()) {
    569     while (partition_ < target_partition) {
    570       if (IsFullPartitionShadowSlice() &&
    571           target_partition < CursorPartition()) {
    572         partition_ = target_partition;
    573         return StepRet(StepRet::Code::kRow);
    574       }
    575 
    576       auto res = Step();
    577       if (!res.is_row())
    578         return res;
    579     }
    580   } else if (/* !defn_->IsPartitioned() && */ partition_ < target_partition) {
    581     int res = PrepareRawStmt();
    582     if (res != SQLITE_OK)
    583       return StepRet(StepRet::Code::kError, res);
    584     partition_ = target_partition;
    585   }
    586   return StepRet(StepRet::Code::kRow);
    587 }
    588 
    589 SpanJoinOperatorTable::Query::StepRet SpanJoinOperatorTable::Query::StepUntil(
    590     int64_t timestamp) {
    591   PERFETTO_DCHECK(!Eof());
    592   auto partition = partition_;
    593   while (partition_ == partition && ts_end_ <= timestamp) {
    594     auto res = Step();
    595     if (!res.is_row())
    596       return res;
    597   }
    598   return StepRet(StepRet::Code::kRow);
    599 }
    600 
    601 std::string SpanJoinOperatorTable::Query::CreateSqlQuery(
    602     const std::vector<std::string>& cs) const {
    603   std::vector<std::string> col_names;
    604   for (const Table::Column& c : defn_->columns()) {
    605     col_names.push_back("`" + c.name() + "`");
    606   }
    607 
    608   std::string sql = "SELECT " + base::Join(col_names, ", ");
    609   sql += " FROM " + defn_->name();
    610   if (!cs.empty()) {
    611     sql += " WHERE " + base::Join(cs, " AND ");
    612   }
    613   sql += " ORDER BY ";
    614   sql += defn_->IsPartitioned()
    615              ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ")
    616              : "ts";
    617   sql += ";";
    618   PERFETTO_DLOG("%s", sql.c_str());
    619   return sql;
    620 }
    621 
    622 int SpanJoinOperatorTable::Query::PrepareRawStmt() {
    623   sqlite3_stmt* stmt = nullptr;
    624   int err =
    625       sqlite3_prepare_v2(db_, sql_query_.c_str(),
    626                          static_cast<int>(sql_query_.size()), &stmt, nullptr);
    627   stmt_.reset(stmt);
    628 
    629   ts_start_ = 0;
    630   ts_end_ = 0;
    631   partition_ = std::numeric_limits<int64_t>::lowest();
    632   cursor_eof_ = false;
    633   mode_ = Mode::kRealSlice;
    634 
    635   return err;
    636 }
    637 
    638 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context,
    639                                                       size_t index) {
    640   if (mode_ != Mode::kRealSlice) {
    641     sqlite3_result_null(context);
    642     return;
    643   }
    644 
    645   sqlite3_stmt* stmt = stmt_.get();
    646   int idx = static_cast<int>(index);
    647   switch (sqlite3_column_type(stmt, idx)) {
    648     case SQLITE_INTEGER:
    649       sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx));
    650       break;
    651     case SQLITE_FLOAT:
    652       sqlite3_result_double(context, sqlite3_column_double(stmt, idx));
    653       break;
    654     case SQLITE_TEXT: {
    655       // TODO(lalitm): note for future optimizations: if we knew the addresses
    656       // of the string intern pool, we could check if the string returned here
    657       // comes from the pool, and pass it as non-transient.
    658       const auto kSqliteTransient =
    659           reinterpret_cast<sqlite3_destructor_type>(-1);
    660       auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
    661       sqlite3_result_text(context, ptr, -1, kSqliteTransient);
    662       break;
    663     }
    664   }
    665 }
    666 
    667 SpanJoinOperatorTable::TableDefinition::TableDefinition(
    668     std::string name,
    669     std::string partition_col,
    670     std::vector<Table::Column> cols,
    671     bool emit_shadow_slices,
    672     uint32_t ts_idx,
    673     uint32_t dur_idx,
    674     uint32_t partition_idx)
    675     : name_(std::move(name)),
    676       partition_col_(std::move(partition_col)),
    677       cols_(std::move(cols)),
    678       emit_shadow_slices_(emit_shadow_slices),
    679       ts_idx_(ts_idx),
    680       dur_idx_(dur_idx),
    681       partition_idx_(partition_idx) {}
    682 
    683 base::Optional<SpanJoinOperatorTable::TableDescriptor>
    684 SpanJoinOperatorTable::TableDescriptor::Parse(
    685     const std::string& raw_descriptor) {
    686   // Descriptors have one of the following forms:
    687   // table_name [PARTITIONED column_name]
    688 
    689   // Find the table name.
    690   base::StringSplitter splitter(raw_descriptor, ' ');
    691   if (!splitter.Next())
    692     return base::nullopt;
    693 
    694   TableDescriptor descriptor;
    695   descriptor.name = splitter.cur_token();
    696   if (!splitter.Next())
    697     return std::move(descriptor);
    698 
    699   if (strcasecmp(splitter.cur_token(), "PARTITIONED") != 0) {
    700     PERFETTO_ELOG("Invalid SPAN_JOIN token %s", splitter.cur_token());
    701     return base::nullopt;
    702   }
    703   if (!splitter.Next()) {
    704     PERFETTO_ELOG("Missing partitioning column");
    705     return base::nullopt;
    706   }
    707 
    708   descriptor.partition_col = splitter.cur_token();
    709   return std::move(descriptor);
    710 }
    711 
    712 }  // namespace trace_processor
    713 }  // namespace perfetto
    714