Skip to content

Commit

Permalink
apacheGH-43060: [C++] Limit buffer size in BufferedInputStream::SetBu…
Browse files Browse the repository at this point in the history
…fferSize with raw_read_bound (apache#43064)

### Rationale for this change

See apache#43060 . This patch optimize memory-usage for buffering

### What changes are included in this PR?

Limit the `buffer_size_` used by `SetBufferSize`

### Are these changes tested?

Yes

### Are there any user-facing changes?

No

* GitHub Issue: apache#43060

Authored-by: mwish <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
mapleFU authored Jun 27, 2024
1 parent 6680dcf commit 1da71ba
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
9 changes: 9 additions & 0 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,20 @@ class BufferedInputStream::Impl : public BufferedBase {
return raw_pos_ - bytes_buffered_;
}

// Resize internal read buffer. Note that the internal buffer-size
// should be not larger than the raw_read_bound_.
Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
}
if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) {
return Status::Invalid("Cannot shrink read buffer if buffered data remains");
}
if (raw_read_bound_ >= 0) {
// No need to reserve space for more than the total remaining number of bytes.
new_buffer_size = std::min(new_buffer_size,
bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
}
return ResizeBuffer(new_buffer_size);
}

Expand Down Expand Up @@ -433,6 +440,8 @@ class BufferedInputStream::Impl : public BufferedBase {
private:
std::shared_ptr<InputStream> raw_;
int64_t raw_read_total_;
// a bound on the maximum number of bytes to read from the raw input stream.
// The default -1 indicates that it is unbounded
int64_t raw_read_bound_;

// Number of remaining bytes in the buffer, to be reduced on each read from
Expand Down
23 changes: 21 additions & 2 deletions cpp/src/arrow/io/buffered_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {
local_pool_ = MemoryPool::CreateDefault();
}

void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool()) {
void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool(),
int64_t raw_read_bound = -1) {
test_data_ = kExample1;

ASSERT_OK_AND_ASSIGN(auto file_out, FileOutputStream::Open(path_));
Expand All @@ -338,7 +339,8 @@ class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {

ASSERT_OK_AND_ASSIGN(auto file_in, ReadableFile::Open(path_));
raw_ = file_in;
ASSERT_OK_AND_ASSIGN(buffered_, BufferedInputStream::Create(buffer_size, pool, raw_));
ASSERT_OK_AND_ASSIGN(
buffered_, BufferedInputStream::Create(buffer_size, pool, raw_, raw_read_bound));
}

protected:
Expand Down Expand Up @@ -472,6 +474,23 @@ TEST_F(TestBufferedInputStream, SetBufferSize) {
ASSERT_OK(buffered_->SetBufferSize(5));
}

// GH-43060: Internal buffer should not greater than the
// bytes could buffer.
TEST_F(TestBufferedInputStream, BufferSizeLimit) {
{
// Buffer size should not exceeds raw_read_bound
MakeExample1(/*buffer_size=*/100000, default_memory_pool(), /*raw_read_bound=*/15);
EXPECT_EQ(15, buffered_->buffer_size());
}
{
// Set a buffer size after read.
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15);
ASSERT_OK(buffered_->Read(10));
ASSERT_OK(buffered_->SetBufferSize(/*new_buffer_size=*/100000));
EXPECT_EQ(5, buffered_->buffer_size());
}
}

class TestBufferedInputStreamBound : public ::testing::Test {
public:
void SetUp() { CreateExample(/*bounded=*/true); }
Expand Down

0 comments on commit 1da71ba

Please sign in to comment.