Skip to content

Commit

Permalink
AsyncDiskFileChunk tune-ups
Browse files Browse the repository at this point in the history
Summary:
- more error checking
- more consistent error checks
- avoid auto
- reworked Windows open file modes

Reviewed By: jtbraun

Differential Revision: D60366655

fbshipit-source-id: 958b5a1c10a98e02e36738c7dd088c3069cb6fd6
  • Loading branch information
Georges Berenger authored and facebook-github-bot committed Jul 31, 2024
1 parent 5130a4d commit 0e9acd4
Showing 1 changed file with 77 additions and 76 deletions.
153 changes: 77 additions & 76 deletions vrs/AsyncDiskFileChunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,50 +73,51 @@ struct AsyncWindowsHandle {
// O_DIRECT is roughly equivalent to (FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH)
// We always open with FILE_FLAG_OVERLAPPED

DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE; // no sharing of read, write or delete
DWORD dwDesiredAccess = 0;

bool badmode = false;
bool rdwr = false;
for (size_t i = 1; modes[i]; ++i) {
switch (modes[i]) {
case 'b':
// Linux has no O_BINARY
break;
case '+':
rdwr = true;
break;

default:
badmode = true;
if (modes[0] != 0) {
for (size_t i = 1; modes[i] != 0; ++i) {
switch (modes[i]) {
case 'b':
// do nothing: binary mode is the only mode available
break;
case '+':
dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
break;

default:
badmode = true;
}
}
}

DWORD dwDesiredAccess = 0;
DWORD dwCreationDisposition = 0;

DWORD dwShareMode = 0;
int whence = SEEK_SET;
switch (modes[0]) {
case 'r':
dwCreationDisposition = rdwr ? OPEN_ALWAYS : OPEN_EXISTING;
dwDesiredAccess |= rdwr ? (GENERIC_READ | GENERIC_WRITE) : GENERIC_READ;
dwCreationDisposition = dwDesiredAccess == 0 ? OPEN_EXISTING : OPEN_ALWAYS;
dwDesiredAccess |= GENERIC_READ;
dwShareMode = FILE_SHARE_READ;
break;
case 'w':
dwCreationDisposition = CREATE_ALWAYS;
dwDesiredAccess |= rdwr ? (GENERIC_READ | GENERIC_WRITE) : GENERIC_WRITE;

dwDesiredAccess |= GENERIC_WRITE;
break;
case 'a':
// dwDesiredAccess |= rdwr ? (GENERIC_READ | GENERIC_WRITE) : GENERIC_WRITE;
// flags |= O_APPEND;
// whence = rdwr ? SEEK_END : SEEK_SET;
// break;
[[fallthrough]]; // not supported yet
dwCreationDisposition = OPEN_ALWAYS;
dwDesiredAccess |= GENERIC_WRITE;
dwShareMode = FILE_SHARE_READ;
whence = SEEK_END;
break;
default:
badmode = true;
}

if (badmode) {
return -1;
XR_LOGCE(VRS_DISKFILECHUNK, "Unsupported open mode: '%s'", modes);
return INVALID_PARAMETER;
}

DWORD dwFlagsAndAttributes = FILE_FLAG_OVERLAPPED;
Expand Down Expand Up @@ -145,9 +146,9 @@ struct AsyncWindowsHandle {
if (!isOpened()) {
return SUCCESS;
}
auto h = h_;
HANDLE h = h_;
h_ = INVALID_HANDLE_VALUE;
auto error = CloseHandle(h) ? SUCCESS : GetLastError();
int error = CloseHandle(h) ? SUCCESS : GetLastError();

return error;
}
Expand Down Expand Up @@ -186,7 +187,7 @@ struct AsyncWindowsHandle {
}
}

auto error = GetLastError();
int error = GetLastError();
if (error != ERROR_IO_PENDING) {
return error;
}
Expand Down Expand Up @@ -252,17 +253,17 @@ struct AsyncWindowsHandle {
using AsyncHandle = AsyncWindowsHandle;
#else
struct AsyncFileDescriptor {
static constexpr int INVALID = -1;
static constexpr int INVALID_FILE_DESCRIPTOR = -1;

AsyncFileDescriptor() = default;
explicit AsyncFileDescriptor(int fd) : fd_(fd) {}
AsyncFileDescriptor(AsyncFileDescriptor&& rhs) noexcept : fd_(rhs.fd_) {
rhs.fd_ = INVALID;
rhs.fd_ = INVALID_FILE_DESCRIPTOR;
}
AsyncFileDescriptor(const AsyncFileDescriptor& rhs) noexcept = delete;
AsyncFileDescriptor& operator=(AsyncFileDescriptor&& rhs) noexcept {
fd_ = rhs.fd_;
rhs.fd_ = INVALID;
rhs.fd_ = INVALID_FILE_DESCRIPTOR;
return *this;
}
AsyncFileDescriptor& operator=(const AsyncFileDescriptor& rhs) = delete;
Expand All @@ -271,29 +272,31 @@ struct AsyncFileDescriptor {
return fd_ == fd;
}

int open(const std::string& path, const char* mode, int flags) {
int open(const std::string& path, const char* modes, int flags) {
assert(!isOpened());

int permissions = 0666;

bool badmode = false;
bool rdwr = false;
for (size_t i = 1; mode[i]; ++i) {
switch (mode[i]) {
case 'b':
// Linux has no O_BINARY
break;
case '+':
rdwr = true;
break;

default:
badmode = true;
if (modes[0] != 0) {
for (size_t i = 1; modes[i] != 0; ++i) {
switch (modes[i]) {
case 'b':
// Linux has no O_BINARY
break;
case '+':
rdwr = true;
break;

default:
badmode = true;
}
}
}

int whence = SEEK_SET;
switch (mode[0]) {
switch (modes[0]) {
case 'r':
flags |= rdwr ? O_RDWR : O_RDONLY;
break;
Expand All @@ -311,14 +314,16 @@ struct AsyncFileDescriptor {
}

if (badmode) {
return -1;
XR_LOGCE(VRS_DISKFILECHUNK, "Unsupported open mode: '%s'", modes);
return INVALID_PARAMETER;
}
int newFd = ::open(path.c_str(), flags, permissions);
if (newFd >= 0) {
if (::lseek64(newFd, 0, whence) == (off64_t)-1) {
::close(newFd);
return errno;
}
if (newFd < 0) {
return errno;
}
if (::lseek64(newFd, 0, whence) < 0) {
::close(newFd);
return errno;
}
fd_ = newFd;
return SUCCESS;
Expand All @@ -329,7 +334,7 @@ struct AsyncFileDescriptor {
}

int read(void* ptr, size_t bufferSize, size_t offset, size_t& outReadSize) {
auto ret = ::pread(fd_, ptr, bufferSize, offset);
ssize_t ret = ::pread(fd_, ptr, bufferSize, offset);
if (ret < 0) {
outReadSize = 0;
return errno;
Expand All @@ -353,8 +358,8 @@ struct AsyncFileDescriptor {
}

int seek(int64_t pos, int origin, int64_t& outFilepos) {
auto result = ::lseek64(fd_, pos, origin);
if (result == (off_t)-1) {
off64_t result = ::lseek64(fd_, pos, origin);
if (result < 0) {
outFilepos = 0;
return errno;
} else {
Expand All @@ -364,14 +369,13 @@ struct AsyncFileDescriptor {
}

int pwrite(const void* buf, size_t count, off_t offset, size_t& written) {
auto result = ::pwrite(fd_, buf, count, offset);
ssize_t result = ::pwrite(fd_, buf, count, offset);
written = result;
if (result != count) {
if (result < 0) {
written = 0;
return errno;
}

return DISKFILE_PARTIAL_WRITE_ERROR;
}
return SUCCESS;
Expand All @@ -381,12 +385,12 @@ struct AsyncFileDescriptor {
if (fd_ < 0) {
return SUCCESS;
}
auto fd = fd_;
fd_ = -1;
int fd = fd_;
fd_ = INVALID_FILE_DESCRIPTOR;
return ::close(fd);
}

int fd_ = -1;
int fd_ = INVALID_FILE_DESCRIPTOR;
};
using AsyncHandle = AsyncFileDescriptor;
#endif
Expand Down Expand Up @@ -462,7 +466,7 @@ class AlignedBuffer {
[[nodiscard]] ssize_t add(const void* buffer, size_t size) {
assert(size);

auto capacity = this->capacity();
size_t capacity = this->capacity();
if (capacity == 0) {
return -1;
}
Expand Down Expand Up @@ -678,7 +682,7 @@ class AsyncDiskFileChunk {
file_mode_ = "wb+";

IF_ERROR_RETURN(init_parameters(options));
auto error = ensureOpenDirect();
int error = ensureOpenDirect();
if (error != 0 && 0 != (O_DIRECT & supported_flags_)) {
error = ensureOpenNonDirect();
if (error == 0) {
Expand Down Expand Up @@ -716,17 +720,14 @@ class AsyncDiskFileChunk {
return SUCCESS;
}

auto error = flushWriteBuffer();
int error = flushWriteBuffer();

// Release the write buffers, if any. File chunking is a rare enough event that it's not worth
// trying to move these to the next currentChunk.
free_write_buffers();

auto error2 = file_.close();
if (error == 0) {
error = error2;
}
return error;
int error2 = file_.close();
return error != 0 ? error : error2;
}

int rewind() {
Expand Down Expand Up @@ -815,7 +816,7 @@ class AsyncDiskFileChunk {

while (count != 0) {
// This data is aligned to lenalign, so cache it in the current_buffer_
auto additionalBuffered = current_buffer_->add(bbuffer, count);
ssize_t additionalBuffered = current_buffer_->add(bbuffer, count);
if (additionalBuffered <= 0) {
return DISKFILE_PARTIAL_WRITE_ERROR;
}
Expand Down Expand Up @@ -853,7 +854,7 @@ class AsyncDiskFileChunk {
}
case IoEngine::PSync: {
size_t thiswritten = 0;
auto err = file_.pwrite(
int err = file_.pwrite(
current_buffer_->data(), current_buffer_->size(), file_position_, thiswritten);
// There's no need to release this buffer, as we've already written it. Save a fetch
// later
Expand All @@ -866,7 +867,7 @@ class AsyncDiskFileChunk {
}
default:
XR_LOGCE(VRS_DISKFILECHUNK, "Unhandled ioengine");
return -1;
return VRSERROR_INTERNAL_ERROR;
}
}
}
Expand Down Expand Up @@ -899,7 +900,7 @@ class AsyncDiskFileChunk {
IF_ERROR_RETURN(flushWriteBuffer());
IF_ERROR_RETURN(ensureOpenNonDirect());

auto error = file_.read(buffer, count, file_position_, outReadSize);
int error = file_.read(buffer, count, file_position_, outReadSize);
file_position_ += outReadSize;
return error;
}
Expand Down Expand Up @@ -970,14 +971,14 @@ class AsyncDiskFileChunk {
// default construction and swapping.
if (!buffers_.empty()) {
std::unique_lock lock{buffers_mutex_};
auto expected_free = buffers_.size() - (current_buffer_ ? 1 : 0);
size_t expected_free = buffers_.size() - (current_buffer_ ? 1 : 0);
while (buffers_free_.size() != expected_free) {
// N.B. as buffers are freed they pump the queue to completion.
buffer_freed_cv_.wait(
lock, [this, expected_free] { return buffers_free_.size() == expected_free; });
}

auto async_error = async_error_.exchange(0);
int async_error = async_error_.exchange(0);
if (async_error != 0) {
XR_LOGCE(VRS_DISKFILECHUNK, "Returning async error on flush {}", async_error);
return async_error;
Expand All @@ -988,12 +989,12 @@ class AsyncDiskFileChunk {
IF_ERROR_RETURN(ensureOpenNonDirect());

while (current_buffer_) {
auto towrite = current_buffer_->size();
size_t towrite = current_buffer_->size();

// if we've gotten here we're flushing, so just pwrite() the contents, don't bother being
// fast about it
size_t thiswritten = 0;
auto error = file_.pwrite(current_buffer_->data(), towrite, file_position_, thiswritten);
int error = file_.pwrite(current_buffer_->data(), towrite, file_position_, thiswritten);
free_buffer(current_buffer_);
if (error != 0) {
return error;
Expand Down Expand Up @@ -1047,7 +1048,7 @@ class AsyncDiskFileChunk {
IF_ERROR_RETURN(alloc_write_buffers());
}

auto error = file_.open(path_, mode, requested_flags);
int error = file_.open(path_, mode, requested_flags);
if (error != 0) {
close();
return error;
Expand Down Expand Up @@ -1087,7 +1088,7 @@ class AsyncDiskFileChunk {
}

if (io_errno != SUCCESS) {
auto current_error = async_error_.load();
int current_error = async_error_.load();
while (current_error == SUCCESS &&
!async_error_.compare_exchange_weak(current_error, io_errno)) {
}
Expand Down

0 comments on commit 0e9acd4

Please sign in to comment.