1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/platform/hadoop/hadoop_file_system.h" 17 18 #include "tensorflow/core/lib/core/status_test_util.h" 19 #include "tensorflow/core/lib/gtl/stl_util.h" 20 #include "tensorflow/core/lib/io/path.h" 21 #include "tensorflow/core/platform/file_system.h" 22 #include "tensorflow/core/platform/test.h" 23 24 namespace tensorflow { 25 namespace { 26 27 class HadoopFileSystemTest : public ::testing::Test { 28 protected: 29 HadoopFileSystemTest() {} 30 31 string TmpDir(const string& path) { 32 char* test_dir = getenv("HADOOP_TEST_TMPDIR"); 33 if (test_dir != nullptr) { 34 return io::JoinPath(string(test_dir), path); 35 } else { 36 return "file://" + io::JoinPath(testing::TmpDir(), path); 37 } 38 } 39 40 Status WriteString(const string& fname, const string& content) { 41 std::unique_ptr<WritableFile> writer; 42 TF_RETURN_IF_ERROR(hdfs.NewWritableFile(fname, &writer)); 43 TF_RETURN_IF_ERROR(writer->Append(content)); 44 TF_RETURN_IF_ERROR(writer->Close()); 45 return Status::OK(); 46 } 47 48 Status ReadAll(const string& fname, string* content) { 49 std::unique_ptr<RandomAccessFile> reader; 50 TF_RETURN_IF_ERROR(hdfs.NewRandomAccessFile(fname, &reader)); 51 52 uint64 file_size = 0; 53 TF_RETURN_IF_ERROR(hdfs.GetFileSize(fname, &file_size)); 54 55 content->resize(file_size); 56 StringPiece result; 57 TF_RETURN_IF_ERROR( 58 reader->Read(0, file_size, &result, gtl::string_as_array(content))); 59 if (file_size != result.size()) { 60 return errors::DataLoss("expected ", file_size, " got ", result.size(), 61 " bytes"); 62 } 63 return Status::OK(); 64 } 65 66 HadoopFileSystem hdfs; 67 }; 68 69 TEST_F(HadoopFileSystemTest, RandomAccessFile) { 70 const string fname = TmpDir("RandomAccessFile"); 71 const string content = "abcdefghijklmn"; 72 TF_ASSERT_OK(WriteString(fname, content)); 73 74 std::unique_ptr<RandomAccessFile> reader; 75 TF_EXPECT_OK(hdfs.NewRandomAccessFile(fname, &reader)); 76 77 string got; 78 got.resize(content.size()); 79 StringPiece result; 80 TF_EXPECT_OK( 81 reader->Read(0, content.size(), &result, gtl::string_as_array(&got))); 82 EXPECT_EQ(content.size(), result.size()); 83 EXPECT_EQ(content, result); 84 85 got.clear(); 86 got.resize(4); 87 TF_EXPECT_OK(reader->Read(2, 4, &result, gtl::string_as_array(&got))); 88 EXPECT_EQ(4, result.size()); 89 EXPECT_EQ(content.substr(2, 4), result); 90 } 91 92 TEST_F(HadoopFileSystemTest, WritableFile) { 93 std::unique_ptr<WritableFile> writer; 94 const string fname = TmpDir("WritableFile"); 95 TF_EXPECT_OK(hdfs.NewWritableFile(fname, &writer)); 96 TF_EXPECT_OK(writer->Append("content1,")); 97 TF_EXPECT_OK(writer->Append("content2")); 98 TF_EXPECT_OK(writer->Flush()); 99 TF_EXPECT_OK(writer->Sync()); 100 TF_EXPECT_OK(writer->Close()); 101 102 string content; 103 TF_EXPECT_OK(ReadAll(fname, &content)); 104 EXPECT_EQ("content1,content2", content); 105 } 106 107 TEST_F(HadoopFileSystemTest, FileExists) { 108 const string fname = TmpDir("FileExists"); 109 EXPECT_EQ(error::Code::NOT_FOUND, hdfs.FileExists(fname).code()); 110 TF_ASSERT_OK(WriteString(fname, "test")); 111 TF_EXPECT_OK(hdfs.FileExists(fname)); 112 } 113 114 TEST_F(HadoopFileSystemTest, GetChildren) { 115 const string base = TmpDir("GetChildren"); 116 TF_EXPECT_OK(hdfs.CreateDir(base)); 117 118 const string file = io::JoinPath(base, "testfile.csv"); 119 TF_EXPECT_OK(WriteString(file, "blah")); 120 const string subdir = io::JoinPath(base, "subdir"); 121 TF_EXPECT_OK(hdfs.CreateDir(subdir)); 122 123 std::vector<string> children; 124 TF_EXPECT_OK(hdfs.GetChildren(base, &children)); 125 std::sort(children.begin(), children.end()); 126 EXPECT_EQ(std::vector<string>({"subdir", "testfile.csv"}), children); 127 } 128 129 TEST_F(HadoopFileSystemTest, DeleteFile) { 130 const string fname = TmpDir("DeleteFile"); 131 EXPECT_FALSE(hdfs.DeleteFile(fname).ok()); 132 TF_ASSERT_OK(WriteString(fname, "test")); 133 TF_EXPECT_OK(hdfs.DeleteFile(fname)); 134 } 135 136 TEST_F(HadoopFileSystemTest, GetFileSize) { 137 const string fname = TmpDir("GetFileSize"); 138 TF_ASSERT_OK(WriteString(fname, "test")); 139 uint64 file_size = 0; 140 TF_EXPECT_OK(hdfs.GetFileSize(fname, &file_size)); 141 EXPECT_EQ(4, file_size); 142 } 143 144 TEST_F(HadoopFileSystemTest, CreateDirStat) { 145 const string dir = TmpDir("CreateDirStat"); 146 TF_EXPECT_OK(hdfs.CreateDir(dir)); 147 FileStatistics stat; 148 TF_EXPECT_OK(hdfs.Stat(dir, &stat)); 149 EXPECT_TRUE(stat.is_directory); 150 } 151 152 TEST_F(HadoopFileSystemTest, DeleteDir) { 153 const string dir = TmpDir("DeleteDir"); 154 EXPECT_FALSE(hdfs.DeleteDir(dir).ok()); 155 TF_EXPECT_OK(hdfs.CreateDir(dir)); 156 TF_EXPECT_OK(hdfs.DeleteDir(dir)); 157 FileStatistics stat; 158 EXPECT_FALSE(hdfs.Stat(dir, &stat).ok()); 159 } 160 161 TEST_F(HadoopFileSystemTest, RenameFile) { 162 const string fname1 = TmpDir("RenameFile1"); 163 const string fname2 = TmpDir("RenameFile2"); 164 TF_ASSERT_OK(WriteString(fname1, "test")); 165 TF_EXPECT_OK(hdfs.RenameFile(fname1, fname2)); 166 string content; 167 TF_EXPECT_OK(ReadAll(fname2, &content)); 168 EXPECT_EQ("test", content); 169 } 170 171 TEST_F(HadoopFileSystemTest, RenameFile_Overwrite) { 172 const string fname1 = TmpDir("RenameFile1"); 173 const string fname2 = TmpDir("RenameFile2"); 174 175 TF_ASSERT_OK(WriteString(fname2, "test")); 176 TF_EXPECT_OK(hdfs.FileExists(fname2)); 177 178 TF_ASSERT_OK(WriteString(fname1, "test")); 179 TF_EXPECT_OK(hdfs.RenameFile(fname1, fname2)); 180 string content; 181 TF_EXPECT_OK(ReadAll(fname2, &content)); 182 EXPECT_EQ("test", content); 183 } 184 185 TEST_F(HadoopFileSystemTest, StatFile) { 186 const string fname = TmpDir("StatFile"); 187 TF_ASSERT_OK(WriteString(fname, "test")); 188 FileStatistics stat; 189 TF_EXPECT_OK(hdfs.Stat(fname, &stat)); 190 EXPECT_EQ(4, stat.length); 191 EXPECT_FALSE(stat.is_directory); 192 } 193 194 TEST_F(HadoopFileSystemTest, WriteWhileReading) { 195 std::unique_ptr<WritableFile> writer; 196 const string fname = TmpDir("WriteWhileReading"); 197 // Skip the test if we're not testing on HDFS. Hadoop's local filesystem 198 // implementation makes no guarantees that writable files are readable while 199 // being written. 200 if (!StringPiece(fname).starts_with("hdfs://")) { 201 return; 202 } 203 204 TF_EXPECT_OK(hdfs.NewWritableFile(fname, &writer)); 205 206 const string content1 = "content1"; 207 TF_EXPECT_OK(writer->Append(content1)); 208 TF_EXPECT_OK(writer->Flush()); 209 210 std::unique_ptr<RandomAccessFile> reader; 211 TF_EXPECT_OK(hdfs.NewRandomAccessFile(fname, &reader)); 212 213 string got; 214 got.resize(content1.size()); 215 StringPiece result; 216 TF_EXPECT_OK( 217 reader->Read(0, content1.size(), &result, gtl::string_as_array(&got))); 218 EXPECT_EQ(content1, result); 219 220 string content2 = "content2"; 221 TF_EXPECT_OK(writer->Append(content2)); 222 TF_EXPECT_OK(writer->Flush()); 223 224 got.resize(content2.size()); 225 TF_EXPECT_OK(reader->Read(content1.size(), content2.size(), &result, 226 gtl::string_as_array(&got))); 227 EXPECT_EQ(content2, result); 228 229 TF_EXPECT_OK(writer->Close()); 230 } 231 232 // NewAppendableFile() is not testable. Local filesystem maps to 233 // ChecksumFileSystem in Hadoop, where appending is an unsupported operation. 234 235 } // namespace 236 } // namespace tensorflow 237