Home | History | Annotate | Download | only in hadoop
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/platform/hadoop/hadoop_file_system.h"
     17 
     18 #include "tensorflow/core/lib/core/status_test_util.h"
     19 #include "tensorflow/core/lib/gtl/stl_util.h"
     20 #include "tensorflow/core/lib/io/path.h"
     21 #include "tensorflow/core/platform/file_system.h"
     22 #include "tensorflow/core/platform/test.h"
     23 
     24 namespace tensorflow {
     25 namespace {
     26 
     27 class HadoopFileSystemTest : public ::testing::Test {
     28  protected:
     29   HadoopFileSystemTest() {}
     30 
     31   string TmpDir(const string& path) {
     32     char* test_dir = getenv("HADOOP_TEST_TMPDIR");
     33     if (test_dir != nullptr) {
     34       return io::JoinPath(string(test_dir), path);
     35     } else {
     36       return "file://" + io::JoinPath(testing::TmpDir(), path);
     37     }
     38   }
     39 
     40   Status WriteString(const string& fname, const string& content) {
     41     std::unique_ptr<WritableFile> writer;
     42     TF_RETURN_IF_ERROR(hdfs.NewWritableFile(fname, &writer));
     43     TF_RETURN_IF_ERROR(writer->Append(content));
     44     TF_RETURN_IF_ERROR(writer->Close());
     45     return Status::OK();
     46   }
     47 
     48   Status ReadAll(const string& fname, string* content) {
     49     std::unique_ptr<RandomAccessFile> reader;
     50     TF_RETURN_IF_ERROR(hdfs.NewRandomAccessFile(fname, &reader));
     51 
     52     uint64 file_size = 0;
     53     TF_RETURN_IF_ERROR(hdfs.GetFileSize(fname, &file_size));
     54 
     55     content->resize(file_size);
     56     StringPiece result;
     57     TF_RETURN_IF_ERROR(
     58         reader->Read(0, file_size, &result, gtl::string_as_array(content)));
     59     if (file_size != result.size()) {
     60       return errors::DataLoss("expected ", file_size, " got ", result.size(),
     61                               " bytes");
     62     }
     63     return Status::OK();
     64   }
     65 
     66   HadoopFileSystem hdfs;
     67 };
     68 
     69 TEST_F(HadoopFileSystemTest, RandomAccessFile) {
     70   const string fname = TmpDir("RandomAccessFile");
     71   const string content = "abcdefghijklmn";
     72   TF_ASSERT_OK(WriteString(fname, content));
     73 
     74   std::unique_ptr<RandomAccessFile> reader;
     75   TF_EXPECT_OK(hdfs.NewRandomAccessFile(fname, &reader));
     76 
     77   string got;
     78   got.resize(content.size());
     79   StringPiece result;
     80   TF_EXPECT_OK(
     81       reader->Read(0, content.size(), &result, gtl::string_as_array(&got)));
     82   EXPECT_EQ(content.size(), result.size());
     83   EXPECT_EQ(content, result);
     84 
     85   got.clear();
     86   got.resize(4);
     87   TF_EXPECT_OK(reader->Read(2, 4, &result, gtl::string_as_array(&got)));
     88   EXPECT_EQ(4, result.size());
     89   EXPECT_EQ(content.substr(2, 4), result);
     90 }
     91 
     92 TEST_F(HadoopFileSystemTest, WritableFile) {
     93   std::unique_ptr<WritableFile> writer;
     94   const string fname = TmpDir("WritableFile");
     95   TF_EXPECT_OK(hdfs.NewWritableFile(fname, &writer));
     96   TF_EXPECT_OK(writer->Append("content1,"));
     97   TF_EXPECT_OK(writer->Append("content2"));
     98   TF_EXPECT_OK(writer->Flush());
     99   TF_EXPECT_OK(writer->Sync());
    100   TF_EXPECT_OK(writer->Close());
    101 
    102   string content;
    103   TF_EXPECT_OK(ReadAll(fname, &content));
    104   EXPECT_EQ("content1,content2", content);
    105 }
    106 
    107 TEST_F(HadoopFileSystemTest, FileExists) {
    108   const string fname = TmpDir("FileExists");
    109   EXPECT_EQ(error::Code::NOT_FOUND, hdfs.FileExists(fname).code());
    110   TF_ASSERT_OK(WriteString(fname, "test"));
    111   TF_EXPECT_OK(hdfs.FileExists(fname));
    112 }
    113 
    114 TEST_F(HadoopFileSystemTest, GetChildren) {
    115   const string base = TmpDir("GetChildren");
    116   TF_EXPECT_OK(hdfs.CreateDir(base));
    117 
    118   const string file = io::JoinPath(base, "testfile.csv");
    119   TF_EXPECT_OK(WriteString(file, "blah"));
    120   const string subdir = io::JoinPath(base, "subdir");
    121   TF_EXPECT_OK(hdfs.CreateDir(subdir));
    122 
    123   std::vector<string> children;
    124   TF_EXPECT_OK(hdfs.GetChildren(base, &children));
    125   std::sort(children.begin(), children.end());
    126   EXPECT_EQ(std::vector<string>({"subdir", "testfile.csv"}), children);
    127 }
    128 
    129 TEST_F(HadoopFileSystemTest, DeleteFile) {
    130   const string fname = TmpDir("DeleteFile");
    131   EXPECT_FALSE(hdfs.DeleteFile(fname).ok());
    132   TF_ASSERT_OK(WriteString(fname, "test"));
    133   TF_EXPECT_OK(hdfs.DeleteFile(fname));
    134 }
    135 
    136 TEST_F(HadoopFileSystemTest, GetFileSize) {
    137   const string fname = TmpDir("GetFileSize");
    138   TF_ASSERT_OK(WriteString(fname, "test"));
    139   uint64 file_size = 0;
    140   TF_EXPECT_OK(hdfs.GetFileSize(fname, &file_size));
    141   EXPECT_EQ(4, file_size);
    142 }
    143 
    144 TEST_F(HadoopFileSystemTest, CreateDirStat) {
    145   const string dir = TmpDir("CreateDirStat");
    146   TF_EXPECT_OK(hdfs.CreateDir(dir));
    147   FileStatistics stat;
    148   TF_EXPECT_OK(hdfs.Stat(dir, &stat));
    149   EXPECT_TRUE(stat.is_directory);
    150 }
    151 
    152 TEST_F(HadoopFileSystemTest, DeleteDir) {
    153   const string dir = TmpDir("DeleteDir");
    154   EXPECT_FALSE(hdfs.DeleteDir(dir).ok());
    155   TF_EXPECT_OK(hdfs.CreateDir(dir));
    156   TF_EXPECT_OK(hdfs.DeleteDir(dir));
    157   FileStatistics stat;
    158   EXPECT_FALSE(hdfs.Stat(dir, &stat).ok());
    159 }
    160 
    161 TEST_F(HadoopFileSystemTest, RenameFile) {
    162   const string fname1 = TmpDir("RenameFile1");
    163   const string fname2 = TmpDir("RenameFile2");
    164   TF_ASSERT_OK(WriteString(fname1, "test"));
    165   TF_EXPECT_OK(hdfs.RenameFile(fname1, fname2));
    166   string content;
    167   TF_EXPECT_OK(ReadAll(fname2, &content));
    168   EXPECT_EQ("test", content);
    169 }
    170 
    171 TEST_F(HadoopFileSystemTest, RenameFile_Overwrite) {
    172   const string fname1 = TmpDir("RenameFile1");
    173   const string fname2 = TmpDir("RenameFile2");
    174 
    175   TF_ASSERT_OK(WriteString(fname2, "test"));
    176   TF_EXPECT_OK(hdfs.FileExists(fname2));
    177 
    178   TF_ASSERT_OK(WriteString(fname1, "test"));
    179   TF_EXPECT_OK(hdfs.RenameFile(fname1, fname2));
    180   string content;
    181   TF_EXPECT_OK(ReadAll(fname2, &content));
    182   EXPECT_EQ("test", content);
    183 }
    184 
    185 TEST_F(HadoopFileSystemTest, StatFile) {
    186   const string fname = TmpDir("StatFile");
    187   TF_ASSERT_OK(WriteString(fname, "test"));
    188   FileStatistics stat;
    189   TF_EXPECT_OK(hdfs.Stat(fname, &stat));
    190   EXPECT_EQ(4, stat.length);
    191   EXPECT_FALSE(stat.is_directory);
    192 }
    193 
    194 TEST_F(HadoopFileSystemTest, WriteWhileReading) {
    195   std::unique_ptr<WritableFile> writer;
    196   const string fname = TmpDir("WriteWhileReading");
    197   // Skip the test if we're not testing on HDFS. Hadoop's local filesystem
    198   // implementation makes no guarantees that writable files are readable while
    199   // being written.
    200   if (!StringPiece(fname).starts_with("hdfs://")) {
    201     return;
    202   }
    203 
    204   TF_EXPECT_OK(hdfs.NewWritableFile(fname, &writer));
    205 
    206   const string content1 = "content1";
    207   TF_EXPECT_OK(writer->Append(content1));
    208   TF_EXPECT_OK(writer->Flush());
    209 
    210   std::unique_ptr<RandomAccessFile> reader;
    211   TF_EXPECT_OK(hdfs.NewRandomAccessFile(fname, &reader));
    212 
    213   string got;
    214   got.resize(content1.size());
    215   StringPiece result;
    216   TF_EXPECT_OK(
    217       reader->Read(0, content1.size(), &result, gtl::string_as_array(&got)));
    218   EXPECT_EQ(content1, result);
    219 
    220   string content2 = "content2";
    221   TF_EXPECT_OK(writer->Append(content2));
    222   TF_EXPECT_OK(writer->Flush());
    223 
    224   got.resize(content2.size());
    225   TF_EXPECT_OK(reader->Read(content1.size(), content2.size(), &result,
    226                             gtl::string_as_array(&got)));
    227   EXPECT_EQ(content2, result);
    228 
    229   TF_EXPECT_OK(writer->Close());
    230 }
    231 
    232 // NewAppendableFile() is not testable. Local filesystem maps to
    233 // ChecksumFileSystem in Hadoop, where appending is an unsupported operation.
    234 
    235 }  // namespace
    236 }  // namespace tensorflow
    237