1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/contrib/cloud/kernels/bigquery_table_accessor.h" 17 #include "tensorflow/contrib/cloud/kernels/bigquery_table_accessor_test_data.h" 18 #include "tensorflow/core/example/feature.pb.h" 19 #include "tensorflow/core/lib/core/status_test_util.h" 20 #include "tensorflow/core/lib/gtl/stl_util.h" 21 #include "tensorflow/core/platform/cloud/http_request_fake.h" 22 #include "tensorflow/core/platform/test.h" 23 24 namespace tensorflow { 25 namespace { 26 27 constexpr char kTestProject[] = "test-project"; 28 constexpr char kTestDataset[] = "test-dataset"; 29 constexpr char kTestTable[] = "test-table"; 30 31 bool HasSubstr(const string& base, const string& substr) { 32 bool ok = StringPiece(base).contains(substr); 33 EXPECT_TRUE(ok) << base << ", expected substring " << substr; 34 return ok; 35 } 36 37 class FakeAuthProvider : public AuthProvider { 38 public: 39 Status GetToken(string* token) override { 40 *token = "fake_token"; 41 return Status::OK(); 42 } 43 }; 44 45 string DeterministicSerialization(const tensorflow::Example& example) { 46 const std::size_t size = example.ByteSizeLong(); 47 string result(size, '\0'); 48 ::tensorflow::protobuf::io::ArrayOutputStream array_stream( 49 gtl::string_as_array(&result), size); 50 ::tensorflow::protobuf::io::CodedOutputStream output_stream(&array_stream); 51 52 output_stream.SetSerializationDeterministic(true); 53 example.SerializeWithCachedSizes(&output_stream); 54 EXPECT_FALSE(output_stream.HadError()); 55 EXPECT_EQ(size, output_stream.ByteCount()); 56 return result; 57 } 58 59 } // namespace 60 61 class BigQueryTableAccessorTest : public ::testing::Test { 62 protected: 63 BigQueryTableAccessor::SchemaNode GetSchema() { 64 return accessor_->schema_root_; 65 } 66 67 Status CreateTableAccessor(const string& project_id, const string& dataset_id, 68 const string& table_id, int64 timestamp_millis, 69 int64 row_buffer_size, 70 const std::vector<string>& columns, 71 const BigQueryTablePartition& partition) { 72 return BigQueryTableAccessor::New( 73 project_id, dataset_id, table_id, timestamp_millis, row_buffer_size, "", 74 columns, partition, std::unique_ptr<AuthProvider>(new FakeAuthProvider), 75 std::unique_ptr<HttpRequest::Factory>( 76 new FakeHttpRequestFactory(&requests_)), 77 &accessor_); 78 } 79 80 std::vector<HttpRequest*> requests_; 81 std::unique_ptr<BigQueryTableAccessor> accessor_; 82 }; 83 84 TEST_F(BigQueryTableAccessorTest, NegativeTimestamp) { 85 const auto status = 86 CreateTableAccessor(kTestProject, kTestDataset, kTestTable, -1, 3, {}, 87 BigQueryTablePartition()); 88 EXPECT_TRUE(errors::IsInvalidArgument(status)); 89 } 90 91 TEST_F(BigQueryTableAccessorTest, ZeroTimestamp) { 92 const auto status = 93 CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 0, 3, {}, 94 BigQueryTablePartition()); 95 EXPECT_TRUE(errors::IsInvalidArgument(status)); 96 } 97 98 TEST_F(BigQueryTableAccessorTest, RepeatedFieldNoAllowedTest) { 99 requests_.emplace_back(new FakeHttpRequest( 100 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 101 "datasets/test-dataset/tables/test-table/\n" 102 "Auth Token: fake_token\n", 103 R"({ 104 "kind": "bigquery#table", 105 "etag": "\"4zcX32ezvFoFzxHoG04qJqKZk6c/MTQ1Nzk3NTgwNzE4Mw\"", 106 "id": "test-project:test-dataset.test-table", 107 "schema": { 108 "fields": [ 109 { 110 "name": "int_field", 111 "type": "INTEGER", 112 "mode": "REPEATED" 113 }] 114 }, 115 "numRows": "10" 116 })")); 117 const auto status = 118 CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 3, {}, 119 BigQueryTablePartition()); 120 EXPECT_TRUE(errors::IsUnimplemented(status)); 121 EXPECT_TRUE(HasSubstr(status.error_message(), 122 "Tables with repeated columns are not supported")); 123 } 124 125 TEST_F(BigQueryTableAccessorTest, ValidSchemaTest) { 126 requests_.emplace_back(new FakeHttpRequest( 127 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 128 "datasets/test-dataset/tables/test-table/\n" 129 "Auth Token: fake_token\n", 130 kSampleSchema)); 131 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 3, 132 {}, BigQueryTablePartition())); 133 // Validate total number of rows. 134 EXPECT_EQ(4, accessor_->total_num_rows()); 135 136 // Validate the schema. 137 const auto schema_root = GetSchema(); 138 EXPECT_EQ(schema_root.name, ""); 139 EXPECT_EQ(schema_root.type, BigQueryTableAccessor::ColumnType::kNone); 140 EXPECT_EQ(9, schema_root.schema_nodes.size()); 141 142 EXPECT_EQ(schema_root.schema_nodes[0].name, "int_field"); 143 EXPECT_EQ(schema_root.schema_nodes[0].type, 144 BigQueryTableAccessor::ColumnType::kInteger); 145 146 EXPECT_EQ(schema_root.schema_nodes[1].name, "str_field"); 147 EXPECT_EQ(schema_root.schema_nodes[1].type, 148 BigQueryTableAccessor::ColumnType::kString); 149 150 EXPECT_EQ(1, schema_root.schema_nodes[2].schema_nodes.size()); 151 EXPECT_EQ(schema_root.schema_nodes[2].name, "rec_field"); 152 EXPECT_EQ(schema_root.schema_nodes[2].type, 153 BigQueryTableAccessor::ColumnType::kRecord); 154 155 EXPECT_EQ(schema_root.schema_nodes[2].schema_nodes[0].name, 156 "rec_field.float_field"); 157 EXPECT_EQ(schema_root.schema_nodes[2].schema_nodes[0].type, 158 BigQueryTableAccessor::ColumnType::kFloat); 159 160 EXPECT_EQ(schema_root.schema_nodes[3].name, "bool_field"); 161 EXPECT_EQ(schema_root.schema_nodes[3].type, 162 BigQueryTableAccessor::ColumnType::kBoolean); 163 164 EXPECT_EQ(schema_root.schema_nodes[4].name, "bytes_field"); 165 EXPECT_EQ(schema_root.schema_nodes[4].type, 166 BigQueryTableAccessor::ColumnType::kBytes); 167 168 EXPECT_EQ(schema_root.schema_nodes[5].name, "timestamp_field"); 169 EXPECT_EQ(schema_root.schema_nodes[5].type, 170 BigQueryTableAccessor::ColumnType::kTimestamp); 171 172 EXPECT_EQ(schema_root.schema_nodes[6].name, "date_field"); 173 EXPECT_EQ(schema_root.schema_nodes[6].type, 174 BigQueryTableAccessor::ColumnType::kDate); 175 176 EXPECT_EQ(schema_root.schema_nodes[7].name, "time_field"); 177 EXPECT_EQ(schema_root.schema_nodes[7].type, 178 BigQueryTableAccessor::ColumnType::kTime); 179 180 EXPECT_EQ(schema_root.schema_nodes[8].name, "datetime_field"); 181 EXPECT_EQ(schema_root.schema_nodes[8].type, 182 BigQueryTableAccessor::ColumnType::kDatetime); 183 } 184 185 TEST_F(BigQueryTableAccessorTest, ReadOneRowTest) { 186 requests_.emplace_back(new FakeHttpRequest( 187 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 188 "datasets/test-dataset/tables/test-table/\n" 189 "Auth Token: fake_token\n", 190 kSampleSchema)); 191 requests_.emplace_back(new FakeHttpRequest( 192 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 193 "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n" 194 "Auth Token: fake_token\n", 195 kTestRow)); 196 BigQueryTablePartition partition; 197 partition.set_start_index(2); 198 partition.set_end_index(2); 199 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1, 200 {}, partition)); 201 int64 row_id; 202 Example example; 203 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 204 205 // Validate returned result. 206 Example expected_example; 207 ASSERT_TRUE(protobuf::TextFormat::ParseFromString(kTestExampleProto, 208 &expected_example)); 209 EXPECT_EQ(DeterministicSerialization(expected_example), 210 DeterministicSerialization(example)); 211 EXPECT_EQ(row_id, 2); 212 EXPECT_TRUE(accessor_->Done()); 213 } 214 215 TEST_F(BigQueryTableAccessorTest, ReadOneRowPartialTest) { 216 requests_.emplace_back(new FakeHttpRequest( 217 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 218 "datasets/test-dataset/tables/test-table/\n" 219 "Auth Token: fake_token\n", 220 kSampleSchema)); 221 requests_.emplace_back(new FakeHttpRequest( 222 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 223 "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n" 224 "Auth Token: fake_token\n", 225 kTestRow)); 226 BigQueryTablePartition partition; 227 partition.set_start_index(2); 228 partition.set_end_index(2); 229 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1, 230 {"bool_field", "rec_field.float_field"}, 231 partition)); 232 int64 row_id; 233 Example example; 234 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 235 236 // Validate returned result. 237 EXPECT_EQ(row_id, 2); 238 EXPECT_TRUE(accessor_->Done()); 239 Example expected_example; 240 ASSERT_TRUE(protobuf::TextFormat::ParseFromString(kTestPartialExampleProto, 241 &expected_example)); 242 EXPECT_EQ(DeterministicSerialization(expected_example), 243 DeterministicSerialization(example)); 244 } 245 246 TEST_F(BigQueryTableAccessorTest, ReadOneRowWithNullsTest) { 247 requests_.emplace_back(new FakeHttpRequest( 248 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 249 "datasets/test-dataset/tables/test-table/\n" 250 "Auth Token: fake_token\n", 251 kSampleSchema)); 252 requests_.emplace_back(new FakeHttpRequest( 253 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 254 "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n" 255 "Auth Token: fake_token\n", 256 kTestRowWithNulls)); 257 BigQueryTablePartition partition; 258 partition.set_start_index(2); 259 partition.set_end_index(2); 260 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1, 261 {}, partition)); 262 int64 row_id; 263 Example example; 264 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 265 266 // Validate returned result. 267 Example expected_example; 268 ASSERT_TRUE(protobuf::TextFormat::ParseFromString(kTestExampleProtoWithNulls, 269 &expected_example)); 270 EXPECT_EQ(DeterministicSerialization(expected_example), 271 DeterministicSerialization(example)); 272 EXPECT_EQ(row_id, 2); 273 EXPECT_TRUE(accessor_->Done()); 274 } 275 276 TEST_F(BigQueryTableAccessorTest, ReadOneRowTwoRecords) { 277 requests_.emplace_back(new FakeHttpRequest( 278 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 279 "datasets/test-dataset/tables/test-table/\n" 280 "Auth Token: fake_token\n", 281 kSampleSchemaTwoRecords)); 282 requests_.emplace_back(new FakeHttpRequest( 283 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 284 "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n" 285 "Auth Token: fake_token\n", 286 kTestRowWithTwoRecords)); 287 BigQueryTablePartition partition; 288 partition.set_start_index(2); 289 partition.set_end_index(2); 290 TF_EXPECT_OK(CreateTableAccessor( 291 kTestProject, kTestDataset, kTestTable, 1, 1, 292 {"rec_field2.bool_field", "rec_field1.float_field"}, partition)); 293 294 int64 row_id; 295 Example example; 296 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 297 298 // Validate returned result. 299 Example expected_example; 300 ASSERT_TRUE(protobuf::TextFormat::ParseFromString( 301 kTestExampleProtoWithTwoRecords, &expected_example)); 302 EXPECT_EQ(DeterministicSerialization(expected_example), 303 DeterministicSerialization(example)); 304 EXPECT_EQ(row_id, 2); 305 EXPECT_TRUE(accessor_->Done()); 306 } 307 308 TEST_F(BigQueryTableAccessorTest, NonExistentColumns) { 309 requests_.emplace_back(new FakeHttpRequest( 310 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 311 "datasets/test-dataset/tables/test-table/\n" 312 "Auth Token: fake_token\n", 313 kSampleSchemaTwoRecords)); 314 requests_.emplace_back(new FakeHttpRequest( 315 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 316 "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n" 317 "Auth Token: fake_token\n", 318 kTestRowWithTwoRecords)); 319 BigQueryTablePartition partition; 320 partition.set_start_index(2); 321 partition.set_end_index(2); 322 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1, 323 {"bool_field", "float_field"}, partition)); 324 int64 row_id; 325 Example example; 326 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 327 328 // Validate returned result. 329 EXPECT_EQ(row_id, 2); 330 EXPECT_TRUE(accessor_->Done()); 331 } 332 333 TEST_F(BigQueryTableAccessorTest, EmptyRow) { 334 requests_.emplace_back(new FakeHttpRequest( 335 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 336 "datasets/test-dataset/tables/test-table/\n" 337 "Auth Token: fake_token\n", 338 kSampleSchemaTwoRecords)); 339 requests_.emplace_back(new FakeHttpRequest( 340 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 341 "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n" 342 "Auth Token: fake_token\n", 343 kTestEmptyRow)); 344 BigQueryTablePartition partition; 345 partition.set_start_index(2); 346 partition.set_end_index(2); 347 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1, 348 {}, partition)); 349 int64 row_id; 350 Example example; 351 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 352 353 // Validate returned result. 354 EXPECT_EQ(row_id, 2); 355 EXPECT_TRUE(accessor_->Done()); 356 } 357 358 TEST_F(BigQueryTableAccessorTest, BrokenRowTest) { 359 requests_.emplace_back(new FakeHttpRequest( 360 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 361 "datasets/test-dataset/tables/test-table/\n" 362 "Auth Token: fake_token\n", 363 kSampleSchema)); 364 requests_.emplace_back(new FakeHttpRequest( 365 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 366 "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n" 367 "Auth Token: fake_token\n", 368 kBrokenTestRow)); 369 BigQueryTablePartition partition; 370 partition.set_start_index(2); 371 partition.set_end_index(2); 372 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1, 373 {}, partition)); 374 int64 row_id; 375 Example example; 376 const auto status = accessor_->ReadRow(&row_id, &example); 377 EXPECT_TRUE(errors::IsInternal(status)); 378 EXPECT_TRUE( 379 HasSubstr(status.error_message(), "Cannot convert value to integer")); 380 } 381 382 TEST_F(BigQueryTableAccessorTest, MultiplePagesTest) { 383 requests_.emplace_back(new FakeHttpRequest( 384 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 385 "datasets/test-dataset/tables/test-table/\n" 386 "Auth Token: fake_token\n", 387 kSampleSchema)); 388 requests_.emplace_back(new FakeHttpRequest( 389 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 390 "datasets/test-dataset/tables/test-table/data?maxResults=2&startIndex=1\n" 391 "Auth Token: fake_token\n", 392 kTestTwoRows)); 393 requests_.emplace_back(new FakeHttpRequest( 394 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 395 "datasets/test-dataset/tables/test-table/" 396 "data?maxResults=2&pageToken=next_page\n" 397 "Auth Token: fake_token\n", 398 kTestRowWithNulls)); 399 400 BigQueryTablePartition partition; 401 partition.set_start_index(1); 402 partition.set_end_index(-1); 403 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 2, 404 {}, partition)); 405 406 int64 row_id; 407 Example example; 408 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 409 EXPECT_EQ(1, row_id); 410 EXPECT_FALSE(accessor_->Done()); 411 EXPECT_EQ( 412 (example.features().feature()).at("int_field").int64_list().value(0), 413 1111); 414 415 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 416 EXPECT_EQ(2, row_id); 417 EXPECT_FALSE(accessor_->Done()); 418 EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0), 419 2222); 420 421 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 422 EXPECT_EQ(3, row_id); 423 EXPECT_TRUE(accessor_->Done()); 424 425 Example expected_example; 426 ASSERT_TRUE(protobuf::TextFormat::ParseFromString(kTestExampleProtoWithNulls, 427 &expected_example)); 428 EXPECT_EQ(DeterministicSerialization(expected_example), 429 DeterministicSerialization(example)); 430 EXPECT_TRUE(errors::IsOutOfRange(accessor_->ReadRow(&row_id, &example))); 431 } 432 433 TEST_F(BigQueryTableAccessorTest, SwitchingPartitionsTest) { 434 requests_.emplace_back(new FakeHttpRequest( 435 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 436 "datasets/test-dataset/tables/test-table/\n" 437 "Auth Token: fake_token\n", 438 kSampleSchema)); 439 requests_.emplace_back(new FakeHttpRequest( 440 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 441 "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=0\n" 442 "Auth Token: fake_token\n", 443 kTestTwoRows)); 444 requests_.emplace_back(new FakeHttpRequest( 445 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 446 "datasets/test-dataset/tables/test-table/" 447 "data?maxResults=2&startIndex=3\n" 448 "Auth Token: fake_token\n", 449 kTestRowWithNulls)); 450 requests_.emplace_back(new FakeHttpRequest( 451 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 452 "datasets/test-dataset/tables/test-table/data?maxResults=2&startIndex=0\n" 453 "Auth Token: fake_token\n", 454 kTestTwoRows)); 455 456 BigQueryTablePartition partition; 457 partition.set_start_index(0); 458 partition.set_end_index(0); 459 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 2, 460 {}, partition)); 461 462 int64 row_id; 463 Example example; 464 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 465 EXPECT_EQ(0, row_id); 466 EXPECT_TRUE(accessor_->Done()); 467 EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0), 468 1111); 469 470 partition.set_start_index(3); 471 partition.set_end_index(-1); 472 TF_EXPECT_OK(accessor_->SetPartition(partition)); 473 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 474 EXPECT_EQ(3, row_id); 475 EXPECT_TRUE(accessor_->Done()); 476 EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0), 477 1234); 478 479 partition.set_start_index(0); 480 partition.set_end_index(1); 481 TF_EXPECT_OK(accessor_->SetPartition(partition)); 482 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 483 EXPECT_EQ(0, row_id); 484 EXPECT_FALSE(accessor_->Done()); 485 EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0), 486 1111); 487 TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example)); 488 EXPECT_EQ(1, row_id); 489 EXPECT_TRUE(accessor_->Done()); 490 EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0), 491 2222); 492 } 493 494 TEST_F(BigQueryTableAccessorTest, EmptyPartitionTest) { 495 requests_.emplace_back(new FakeHttpRequest( 496 "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/" 497 "datasets/test-dataset/tables/test-table/\n" 498 "Auth Token: fake_token\n", 499 kSampleSchema)); 500 501 BigQueryTablePartition partition; 502 partition.set_start_index(3); 503 partition.set_end_index(2); 504 TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1, 505 {}, partition)); 506 EXPECT_TRUE(accessor_->Done()); 507 508 int64 row_id; 509 Example example; 510 EXPECT_TRUE(errors::IsOutOfRange(accessor_->ReadRow(&row_id, &example))); 511 } 512 513 } // namespace tensorflow 514