1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/contrib/bigtable/kernels/test_kernels/bigtable_test_client.h" 17 18 #include "external/com_github_googleapis_googleapis/google/bigtable/v2/data.pb.h" 19 #include "google/protobuf/wrappers.pb.h" 20 #include "re2/re2.h" 21 #include "tensorflow/core/lib/strings/stringprintf.h" 22 #include "tensorflow/core/util/ptr_util.h" 23 // #include "util/task/codes.pb.h" 24 25 namespace tensorflow { 26 namespace { 27 28 void UpdateRow(const ::google::bigtable::v2::Mutation& mut, 29 std::map<string, string>* row) { 30 if (mut.has_set_cell()) { 31 CHECK(mut.set_cell().timestamp_micros() >= -1) 32 << "Timestamp_micros: " << mut.set_cell().timestamp_micros(); 33 auto col = 34 strings::Printf("%s:%s", mut.set_cell().family_name().c_str(), 35 string(mut.set_cell().column_qualifier()).c_str()); 36 (*row)[col] = string(mut.set_cell().value()); 37 } else if (mut.has_delete_from_column()) { 38 auto col = strings::Printf( 39 "%s:%s", mut.delete_from_column().family_name().c_str(), 40 string(mut.delete_from_column().column_qualifier()).c_str()); 41 row->erase(col); 42 } else if (mut.has_delete_from_family()) { 43 auto itr = row->lower_bound(mut.delete_from_family().family_name()); 44 auto prefix = 45 strings::Printf("%s:", mut.delete_from_family().family_name().c_str()); 46 while (itr != row->end() && itr->first.substr(0, prefix.size()) == prefix) { 47 row->erase(itr); 48 } 49 } else if (mut.has_delete_from_row()) { 50 row->clear(); 51 } else { 52 LOG(ERROR) << "Unknown mutation: " << mut.ShortDebugString(); 53 } 54 } 55 56 } // namespace 57 58 class SampleRowKeysResponse : public grpc::ClientReaderInterface< 59 google::bigtable::v2::SampleRowKeysResponse> { 60 public: 61 explicit SampleRowKeysResponse(BigtableTestClient* client) 62 : client_(client) {} 63 64 bool NextMessageSize(uint32_t* sz) override { 65 mutex_lock l(mu_); 66 mutex_lock l2(client_->mu_); 67 if (num_messages_sent_ * 2 < client_->table_.rows.size()) { 68 *sz = 10000; // A sufficiently high enough value to not worry about. 69 return true; 70 } 71 return false; 72 } 73 74 bool Read(google::bigtable::v2::SampleRowKeysResponse* resp) override { 75 // Send every other key from the table. 76 mutex_lock l(mu_); 77 mutex_lock l2(client_->mu_); 78 *resp = google::bigtable::v2::SampleRowKeysResponse(); 79 auto itr = client_->table_.rows.begin(); 80 for (uint64 i = 0; i < 2 * num_messages_sent_; ++i) { 81 ++itr; 82 if (itr == client_->table_.rows.end()) { 83 return false; 84 } 85 } 86 resp->set_row_key(itr->first); 87 resp->set_offset_bytes(100 * num_messages_sent_); 88 num_messages_sent_++; 89 return true; 90 } 91 92 grpc::Status Finish() override { return grpc::Status::OK; } 93 94 void WaitForInitialMetadata() override {} // Do nothing. 95 96 private: 97 mutex mu_; 98 int64 num_messages_sent_ GUARDED_BY(mu_) = 0; 99 BigtableTestClient* client_; // Not owned. 100 }; 101 102 class ReadRowsResponse : public grpc::ClientReaderInterface< 103 google::bigtable::v2::ReadRowsResponse> { 104 public: 105 ReadRowsResponse(BigtableTestClient* client, 106 google::bigtable::v2::ReadRowsRequest const& request) 107 : client_(client), request_(request) {} 108 109 bool NextMessageSize(uint32_t* sz) override { 110 mutex_lock l(mu_); 111 if (sent_first_message_) { 112 return false; 113 } 114 *sz = 10000000; // A sufficiently high enough value to not worry about. 115 return true; 116 } 117 118 bool Read(google::bigtable::v2::ReadRowsResponse* resp) override { 119 mutex_lock l(mu_); 120 if (sent_first_message_) { 121 return false; 122 } 123 sent_first_message_ = true; 124 RowFilter filter = MakeRowFilter(); 125 126 mutex_lock l2(client_->mu_); 127 *resp = google::bigtable::v2::ReadRowsResponse(); 128 // Send all contents in first response. 129 for (auto itr = client_->table_.rows.begin(); 130 itr != client_->table_.rows.end(); ++itr) { 131 if (filter.AllowRow(itr->first)) { 132 ::google::bigtable::v2::ReadRowsResponse_CellChunk* chunk = nullptr; 133 bool sent_first = false; 134 for (auto col_itr = itr->second.columns.begin(); 135 col_itr != itr->second.columns.end(); ++col_itr) { 136 if (filter.AllowColumn(col_itr->first)) { 137 chunk = resp->add_chunks(); 138 if (!sent_first) { 139 sent_first = true; 140 chunk->set_row_key(itr->first); 141 } 142 auto colon_idx = col_itr->first.find(":"); 143 CHECK(colon_idx != string::npos) 144 << "No ':' found in: " << col_itr->first; 145 chunk->mutable_family_name()->set_value( 146 string(col_itr->first, 0, colon_idx)); 147 chunk->mutable_qualifier()->set_value( 148 string(col_itr->first, ++colon_idx)); 149 if (!filter.strip_values) { 150 chunk->set_value(col_itr->second); 151 } 152 if (filter.only_one_column) { 153 break; 154 } 155 } 156 } 157 if (sent_first) { 158 // We are sending this row, so set the commit flag on the last chunk. 159 chunk->set_commit_row(true); 160 } 161 } 162 } 163 return true; 164 } 165 166 grpc::Status Finish() override { return grpc::Status::OK; } 167 168 void WaitForInitialMetadata() override {} // Do nothing. 169 170 private: 171 struct RowFilter { 172 std::set<string> row_set; 173 std::vector<std::pair<string, string>> row_ranges; 174 double row_sample = 0.0; // Note: currently ignored. 175 std::unique_ptr<RE2> col_filter; 176 bool strip_values = false; 177 bool only_one_column = false; 178 179 bool AllowRow(const string& row) { 180 if (row_set.find(row) != row_set.end()) { 181 return true; 182 } 183 for (const auto& range : row_ranges) { 184 if (range.first <= row && range.second > row) { 185 return true; 186 } 187 } 188 return false; 189 } 190 191 bool AllowColumn(const string& col) { 192 if (col_filter) { 193 return RE2::FullMatch(col, *col_filter); 194 } else { 195 return true; 196 } 197 } 198 }; 199 200 RowFilter MakeRowFilter() { 201 RowFilter filter; 202 for (auto i = request_.rows().row_keys().begin(); 203 i != request_.rows().row_keys().end(); ++i) { 204 filter.row_set.insert(string(*i)); 205 } 206 for (auto i = request_.rows().row_ranges().begin(); 207 i != request_.rows().row_ranges().end(); ++i) { 208 if (i->start_key_case() != 209 google::bigtable::v2::RowRange::kStartKeyClosed || 210 i->end_key_case() != google::bigtable::v2::RowRange::kEndKeyOpen) { 211 LOG(WARNING) << "Skipping row range that cannot be processed: " 212 << i->ShortDebugString(); 213 continue; 214 } 215 filter.row_ranges.emplace_back(std::make_pair( 216 string(i->start_key_closed()), string(i->end_key_open()))); 217 } 218 if (request_.filter().has_chain()) { 219 string family_filter; 220 string qualifier_filter; 221 for (auto i = request_.filter().chain().filters().begin(); 222 i != request_.filter().chain().filters().end(); ++i) { 223 switch (i->filter_case()) { 224 case google::bigtable::v2::RowFilter::kFamilyNameRegexFilter: 225 family_filter = i->family_name_regex_filter(); 226 break; 227 case google::bigtable::v2::RowFilter::kColumnQualifierRegexFilter: 228 qualifier_filter = i->column_qualifier_regex_filter(); 229 break; 230 case google::bigtable::v2::RowFilter::kCellsPerColumnLimitFilter: 231 if (i->cells_per_column_limit_filter() != 1) { 232 LOG(ERROR) << "Unexpected cells_per_column_limit_filter: " 233 << i->cells_per_column_limit_filter(); 234 } 235 break; 236 case google::bigtable::v2::RowFilter::kStripValueTransformer: 237 filter.strip_values = i->strip_value_transformer(); 238 break; 239 case google::bigtable::v2::RowFilter::kRowSampleFilter: 240 LOG(INFO) << "Ignoring row sample directive."; 241 break; 242 case google::bigtable::v2::RowFilter::kPassAllFilter: 243 break; 244 case google::bigtable::v2::RowFilter::kCellsPerRowLimitFilter: 245 filter.only_one_column = true; 246 break; 247 default: 248 LOG(WARNING) << "Ignoring unknown filter type: " 249 << i->ShortDebugString(); 250 } 251 } 252 if (family_filter.empty() || qualifier_filter.empty()) { 253 LOG(WARNING) << "Missing regex!"; 254 } else { 255 string regex = strings::Printf("%s:%s", family_filter.c_str(), 256 qualifier_filter.c_str()); 257 filter.col_filter.reset(new RE2(regex)); 258 } 259 } else { 260 LOG(WARNING) << "Read request did not have a filter chain specified: " 261 << request_.filter().DebugString(); 262 } 263 return filter; 264 } 265 266 mutex mu_; 267 bool sent_first_message_ GUARDED_BY(mu_) = false; 268 BigtableTestClient* client_; // Not owned. 269 const google::bigtable::v2::ReadRowsRequest request_; 270 }; 271 272 class MutateRowsResponse : public grpc::ClientReaderInterface< 273 google::bigtable::v2::MutateRowsResponse> { 274 public: 275 explicit MutateRowsResponse(size_t num_successes) 276 : num_successes_(num_successes) {} 277 278 bool NextMessageSize(uint32_t* sz) override { 279 mutex_lock l(mu_); 280 if (sent_first_message_) { 281 return false; 282 } 283 *sz = 10000000; // A sufficiently high enough value to not worry about. 284 return true; 285 } 286 287 bool Read(google::bigtable::v2::MutateRowsResponse* resp) override { 288 mutex_lock l(mu_); 289 if (sent_first_message_) { 290 return false; 291 } 292 sent_first_message_ = true; 293 *resp = google::bigtable::v2::MutateRowsResponse(); 294 for (size_t i = 0; i < num_successes_; ++i) { 295 auto entry = resp->add_entries(); 296 entry->set_index(i); 297 } 298 return true; 299 } 300 301 grpc::Status Finish() override { return grpc::Status::OK; } 302 303 void WaitForInitialMetadata() override {} // Do nothing. 304 305 private: 306 const size_t num_successes_; 307 308 mutex mu_; 309 bool sent_first_message_ = false; 310 }; 311 312 grpc::Status BigtableTestClient::MutateRow( 313 grpc::ClientContext* context, 314 google::bigtable::v2::MutateRowRequest const& request, 315 google::bigtable::v2::MutateRowResponse* response) { 316 mutex_lock l(mu_); 317 auto* row = &table_.rows[string(request.row_key())]; 318 for (int i = 0; i < request.mutations_size(); ++i) { 319 UpdateRow(request.mutations(i), &row->columns); 320 } 321 *response = google::bigtable::v2::MutateRowResponse(); 322 return grpc::Status::OK; 323 } 324 grpc::Status BigtableTestClient::CheckAndMutateRow( 325 grpc::ClientContext* context, 326 google::bigtable::v2::CheckAndMutateRowRequest const& request, 327 google::bigtable::v2::CheckAndMutateRowResponse* response) { 328 return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, 329 "CheckAndMutateRow not implemented."); 330 } 331 grpc::Status BigtableTestClient::ReadModifyWriteRow( 332 grpc::ClientContext* context, 333 google::bigtable::v2::ReadModifyWriteRowRequest const& request, 334 google::bigtable::v2::ReadModifyWriteRowResponse* response) { 335 return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, 336 "ReadModifyWriteRow not implemented."); 337 } 338 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface< 339 google::bigtable::v2::ReadModifyWriteRowResponse>> 340 BigtableTestClient::AsyncReadModifyWriteRow( 341 grpc::ClientContext* context, 342 google::bigtable::v2::ReadModifyWriteRowRequest const& request, 343 grpc::CompletionQueue* cq) { 344 LOG(WARNING) << "Call to AsyncReadModifyWriteRow:" << __func__ 345 << "(); this will likely cause a crash!"; 346 return nullptr; 347 } 348 349 std::unique_ptr< 350 grpc::ClientReaderInterface<google::bigtable::v2::ReadRowsResponse>> 351 BigtableTestClient::ReadRows( 352 grpc::ClientContext* context, 353 google::bigtable::v2::ReadRowsRequest const& request) { 354 return MakeUnique<ReadRowsResponse>(this, request); 355 } 356 357 std::unique_ptr< 358 grpc::ClientReaderInterface<google::bigtable::v2::SampleRowKeysResponse>> 359 BigtableTestClient::SampleRowKeys( 360 grpc::ClientContext* context, 361 google::bigtable::v2::SampleRowKeysRequest const& request) { 362 return MakeUnique<SampleRowKeysResponse>(this); 363 } 364 std::unique_ptr< 365 grpc::ClientReaderInterface<google::bigtable::v2::MutateRowsResponse>> 366 BigtableTestClient::MutateRows( 367 grpc::ClientContext* context, 368 google::bigtable::v2::MutateRowsRequest const& request) { 369 mutex_lock l(mu_); 370 for (auto i = request.entries().begin(); i != request.entries().end(); ++i) { 371 auto* row = &table_.rows[string(i->row_key())]; 372 for (auto mut = i->mutations().begin(); mut != i->mutations().end(); 373 ++mut) { 374 UpdateRow(*mut, &row->columns); 375 } 376 } 377 return MakeUnique<MutateRowsResponse>(request.entries_size()); 378 } 379 380 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface< 381 google::bigtable::v2::MutateRowResponse>> 382 BigtableTestClient::AsyncMutateRow( 383 grpc::ClientContext* context, 384 google::bigtable::v2::MutateRowRequest const& request, 385 grpc::CompletionQueue* cq) { 386 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__ 387 << "(); this will likely cause a crash!"; 388 return nullptr; 389 } 390 391 std::unique_ptr<::grpc::ClientAsyncReaderInterface< 392 ::google::bigtable::v2::SampleRowKeysResponse>> 393 BigtableTestClient::AsyncSampleRowKeys( 394 ::grpc::ClientContext* context, 395 const ::google::bigtable::v2::SampleRowKeysRequest& request, 396 ::grpc::CompletionQueue* cq, void* tag) { 397 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__ 398 << "(); this will likely cause a crash!"; 399 return nullptr; 400 } 401 402 std::unique_ptr<::grpc::ClientAsyncReaderInterface< 403 ::google::bigtable::v2::MutateRowsResponse>> 404 BigtableTestClient::AsyncMutateRows( 405 ::grpc::ClientContext* context, 406 const ::google::bigtable::v2::MutateRowsRequest& request, 407 ::grpc::CompletionQueue* cq, void* tag) { 408 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__ 409 << "(); this will likely cause a crash!"; 410 return nullptr; 411 } 412 413 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface< 414 google::bigtable::v2::CheckAndMutateRowResponse>> 415 BigtableTestClient::AsyncCheckAndMutateRow( 416 grpc::ClientContext* context, 417 const google::bigtable::v2::CheckAndMutateRowRequest& request, 418 grpc::CompletionQueue* cq) { 419 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__ 420 << "(); this will likely cause a crash!"; 421 return nullptr; 422 } 423 424 std::unique_ptr< 425 grpc::ClientAsyncReaderInterface<google::bigtable::v2::ReadRowsResponse>> 426 BigtableTestClient::AsyncReadRows( 427 grpc::ClientContext* context, 428 const google::bigtable::v2::ReadRowsRequest& request, 429 grpc::CompletionQueue* cq, void* tag) { 430 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__ 431 << "(); this will likely cause a crash!"; 432 return nullptr; 433 } 434 435 std::shared_ptr<grpc::Channel> BigtableTestClient::Channel() { 436 LOG(WARNING) << "Call to InMemoryDataClient::Channel(); this will likely " 437 "cause a crash!"; 438 return nullptr; 439 } 440 } // namespace tensorflow 441