Skip to content

Commit

Permalink
[env] Add test cases in flink-env test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
masteryhx committed Apr 1, 2024
1 parent de9582b commit f64498f
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 16 deletions.
19 changes: 13 additions & 6 deletions env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class FlinkWritableFile : public FSWritableFile {
jobject fsDataOutputStream = jniEnv->CallObjectMethod(
file_system_instance_, fileSystemCreateMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
if (fsDataOutputStream == nullptr) {
if (fsDataOutputStream == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
"CallObjectMethod Exception when Init FlinkWritableFile, ")
Expand Down Expand Up @@ -193,7 +193,7 @@ class FlinkReadableFile : virtual public FSSequentialFile,
jobject fsDataInputStream = jniEnv->CallObjectMethod(
file_system_instance_, openMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
if (fsDataInputStream == nullptr) {
if (fsDataInputStream == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
"CallObjectMethod Exception when Init FlinkReadableFile, ")
Expand Down Expand Up @@ -355,7 +355,7 @@ Status FlinkFileSystem::Init() {
jobject fileSystemInstance = jniEnv->CallStaticObjectMethod(
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance);
jniEnv->DeleteLocalRef(uriInstance);
if (fileSystemInstance == nullptr) {
if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
"CallStaticObjectMethod Exception when Init FlinkFileSystem, ")
Expand Down Expand Up @@ -504,7 +504,7 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name,
auto fileStatusArray = (jobjectArray)jniEnv->CallObjectMethod(
file_system_instance_, listStatusMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
if (fileStatusArray == nullptr) {
if (fileStatusArray == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
.append(listStatusMethod.ToString())
Expand All @@ -516,7 +516,7 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name,
jsize fileStatusArrayLen = jniEnv->GetArrayLength(fileStatusArray);
for (jsize i = 0; i < fileStatusArrayLen; i++) {
jobject fileStatusObj = jniEnv->GetObjectArrayElement(fileStatusArray, i);
if (fileStatusObj == nullptr) {
if (fileStatusObj == nullptr || jniEnv->ExceptionCheck()) {
jniEnv->DeleteLocalRef(fileStatusArray);
return CheckThenError(
"Exception when GetObjectArrayElement in GetChildren");
Expand All @@ -527,7 +527,7 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name,
jobject subPath =
jniEnv->CallObjectMethod(fileStatusObj, getPathMethod.javaMethod);
jniEnv->DeleteLocalRef(fileStatusObj);
if (subPath == nullptr) {
if (subPath == nullptr || jniEnv->ExceptionCheck()) {
jniEnv->DeleteLocalRef(fileStatusArray);
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
Expand All @@ -539,6 +539,13 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name,
auto subPathStr = (jstring)jniEnv->CallObjectMethod(
subPath, pathToStringMethod.javaMethod);
jniEnv->DeleteLocalRef(subPath);
if (subPathStr == nullptr || jniEnv->ExceptionCheck()) {
jniEnv->DeleteLocalRef(fileStatusArray);
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
.append(pathToStringMethod.ToString()));
}

const char* str = jniEnv->GetStringUTFChars(subPathStr, nullptr);
result->emplace_back(str);
jniEnv->ReleaseStringUTFChars(subPathStr, str);
Expand Down
122 changes: 113 additions & 9 deletions env/flink/env_flink_test_suite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,24 @@
std::abort(); \
}

#define LOG(message) (std::cout << (message) << std::endl)

namespace ROCKSDB_NAMESPACE {

EnvFlinkTestSuites::EnvFlinkTestSuites(const std::string& basePath)
: base_path_(basePath) {}

void EnvFlinkTestSuites::runAllTestSuites() {
setUp();
testFileExist();
LOG("Stage 1: setUp OK");
testDirOperation();
LOG("Stage 2: testDirOperation OK");
testFileOperation();
LOG("Stage 3: testFileOperation OK");
testGetChildren();
LOG("Stage 4: testGetChildren OK");
testFileReadAndWrite();
LOG("Stage 5: testFileReadAndWrite OK");
}

void EnvFlinkTestSuites::setUp() {
Expand All @@ -45,22 +55,116 @@ void EnvFlinkTestSuites::setUp() {
}
}

void EnvFlinkTestSuites::testFileExist() {
std::string fileName("test-file");
Status result = flink_env_->FileExists(fileName);
ASSERT_TRUE(result.IsNotFound());
void EnvFlinkTestSuites::testDirOperation() {
const std::string dir_name = "test-dir";
ASSERT_TRUE(flink_env_->FileExists(dir_name).IsNotFound());
ASSERT_TRUE(flink_env_->CreateDir(dir_name).ok());
ASSERT_TRUE(flink_env_->CreateDirIfMissing(dir_name).ok());
ASSERT_TRUE(!flink_env_->CreateDir(dir_name).ok());

bool is_dir;
ASSERT_TRUE(flink_env_->IsDirectory(dir_name, &is_dir).ok() && is_dir);
ASSERT_TRUE(flink_env_->FileExists(dir_name).ok());
ASSERT_TRUE(flink_env_->DeleteDir(dir_name).ok());
ASSERT_TRUE(flink_env_->FileExists(dir_name).IsNotFound());
}

void EnvFlinkTestSuites::testFileOperation() {
const std::string file_name = "test-file";
const std::string not_exist_file_name = "not-exist-file";

// test file exists
ASSERT_TRUE(flink_env_->FileExists(file_name).IsNotFound());
generateFile(file_name);
ASSERT_TRUE(flink_env_->FileExists(file_name).ok());

// test file status
uint64_t file_size, file_mtime;
ASSERT_TRUE(flink_env_->GetFileSize(file_name, &file_size).ok());
ASSERT_TRUE(!flink_env_->GetFileSize(not_exist_file_name, &file_size).ok());
ASSERT_TRUE(file_size > 0);
ASSERT_TRUE(flink_env_->GetFileModificationTime(file_name, &file_mtime).ok());
ASSERT_TRUE(
!flink_env_->GetFileModificationTime(not_exist_file_name, &file_mtime)
.ok());
ASSERT_TRUE(file_mtime > 0);

// test renaming file
const std::string file_name_2 = "test-file-2";
flink_env_->RenameFile(file_name, file_name_2);
ASSERT_TRUE(flink_env_->FileExists(file_name).IsNotFound());
ASSERT_TRUE(flink_env_->FileExists(file_name_2).ok());
ASSERT_TRUE(flink_env_->DeleteFile(file_name_2).ok());
ASSERT_TRUE(flink_env_->FileExists(file_name_2).IsNotFound());
}

void EnvFlinkTestSuites::testGetChildren() {
const std::string dir_name = "test-dir";
const std::string sub_dir_name = dir_name + "/test-sub-dir";
const std::string file_name_1 = dir_name + "/test-file-1";
const std::string file_name_2 = dir_name + "/test-file-2";
ASSERT_TRUE(flink_env_->CreateDirIfMissing(dir_name).ok());
ASSERT_TRUE(flink_env_->CreateDirIfMissing(sub_dir_name).ok());
generateFile(file_name_1);
generateFile(file_name_2);

std::vector<std::string> result,
expected{base_path_ + sub_dir_name, base_path_ + file_name_1,
base_path_ + file_name_2};
ASSERT_TRUE(flink_env_->GetChildren(dir_name, &result).ok());
ASSERT_TRUE(result.size() == 3);
std::sort(result.begin(), result.end());
std::sort(expected.begin(), expected.end());
ASSERT_TRUE(expected == result);
}

void EnvFlinkTestSuites::testFileReadAndWrite() {
const std::string file_name = "test-file";
const std::string content1 = "Hello World", content2 = ", Hello ForSt",
content = content1 + content2;

std::unique_ptr<WritableFile> write_result;
ASSERT_TRUE(
flink_env_->NewWritableFile(file_name, &write_result, EnvOptions()).ok());
write_result->Append(content1);
write_result->Append(content2);
write_result->Sync();
write_result->Flush();
write_result->Close();

std::unique_ptr<SequentialFile> sequential_result;
ASSERT_TRUE(
flink_env_->NewSequentialFile(file_name, &sequential_result, EnvOptions())
.ok());
Slice sequential_data;
std::string sequential_scratch;
sequential_result->Skip(content1.size());
sequential_result->Read(content2.size(), &sequential_data,
(char*)sequential_scratch.data());
ASSERT_TRUE(sequential_data.data() == content2);

std::unique_ptr<RandomAccessFile> random_access_result;
ASSERT_TRUE(
flink_env_
->NewRandomAccessFile(file_name, &random_access_result, EnvOptions())
.ok());
Slice random_access_data;
std::string random_access_scratch;
random_access_result->Read(content1.size(), content.size() - content1.size(),
&random_access_data,
(char*)random_access_scratch.data());
ASSERT_TRUE(random_access_data.data() == content2);
}

void EnvFlinkTestSuites::generateFile(const std::string& fileName) {
// Generate a file manually
const std::string prefix = "file:";
std::string writeFileName = base_path_ + fileName;
if (writeFileName.compare(0, prefix.size(), prefix) == 0) {
writeFileName = writeFileName.substr(prefix.size());
}
std::ofstream writeFile(writeFileName);
writeFile << "testFileExist";
writeFile << "Hello World";
writeFile.close();

result = flink_env_->FileExists(fileName);
ASSERT_TRUE(result.ok());
}
} // namespace ROCKSDB_NAMESPACE
7 changes: 6 additions & 1 deletion env/flink/env_flink_test_suite.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class EnvFlinkTestSuites {
std::unique_ptr<ROCKSDB_NAMESPACE::Env> flink_env_;
const std::string base_path_;
void setUp();
void testFileExist();
void testDirOperation();
void testFileOperation();
void testGetChildren();
void testFileReadAndWrite();

void generateFile(const std::string& fileName);
};
} // namespace ROCKSDB_NAMESPACE

0 comments on commit f64498f

Please sign in to comment.