diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc index c53b3d223d4c0..b41e4257af215 100644 --- a/cpp/src/arrow/io/buffered.cc +++ b/cpp/src/arrow/io/buffered.cc @@ -282,6 +282,8 @@ class BufferedInputStream::Impl : public BufferedBase { return raw_pos_ - bytes_buffered_; } + // Resize internal read buffer. Note that the internal buffer-size + // should be not larger than the raw_read_bound_. Status SetBufferSize(int64_t new_buffer_size) { if (new_buffer_size <= 0) { return Status::Invalid("Buffer size should be positive"); @@ -289,6 +291,11 @@ class BufferedInputStream::Impl : public BufferedBase { if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) { return Status::Invalid("Cannot shrink read buffer if buffered data remains"); } + if (raw_read_bound_ >= 0) { + // No need to reserve space for more than the total remaining number of bytes. + new_buffer_size = std::min(new_buffer_size, + bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); + } return ResizeBuffer(new_buffer_size); } @@ -433,6 +440,8 @@ class BufferedInputStream::Impl : public BufferedBase { private: std::shared_ptr raw_; int64_t raw_read_total_; + // a bound on the maximum number of bytes to read from the raw input stream. + // The default -1 indicates that it is unbounded int64_t raw_read_bound_; // Number of remaining bytes in the buffer, to be reduced on each read from diff --git a/cpp/src/arrow/io/buffered_test.cc b/cpp/src/arrow/io/buffered_test.cc index 82feeea0517fd..cbf2c2cf06938 100644 --- a/cpp/src/arrow/io/buffered_test.cc +++ b/cpp/src/arrow/io/buffered_test.cc @@ -329,7 +329,8 @@ class TestBufferedInputStream : public FileTestFixture { local_pool_ = MemoryPool::CreateDefault(); } - void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool()) { + void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool(), + int64_t raw_read_bound = -1) { test_data_ = kExample1; ASSERT_OK_AND_ASSIGN(auto file_out, FileOutputStream::Open(path_)); @@ -338,7 +339,8 @@ class TestBufferedInputStream : public FileTestFixture { ASSERT_OK_AND_ASSIGN(auto file_in, ReadableFile::Open(path_)); raw_ = file_in; - ASSERT_OK_AND_ASSIGN(buffered_, BufferedInputStream::Create(buffer_size, pool, raw_)); + ASSERT_OK_AND_ASSIGN( + buffered_, BufferedInputStream::Create(buffer_size, pool, raw_, raw_read_bound)); } protected: @@ -472,6 +474,23 @@ TEST_F(TestBufferedInputStream, SetBufferSize) { ASSERT_OK(buffered_->SetBufferSize(5)); } +// GH-43060: Internal buffer should not greater than the +// bytes could buffer. +TEST_F(TestBufferedInputStream, BufferSizeLimit) { + { + // Buffer size should not exceeds raw_read_bound + MakeExample1(/*buffer_size=*/100000, default_memory_pool(), /*raw_read_bound=*/15); + EXPECT_EQ(15, buffered_->buffer_size()); + } + { + // Set a buffer size after read. + MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15); + ASSERT_OK(buffered_->Read(10)); + ASSERT_OK(buffered_->SetBufferSize(/*new_buffer_size=*/100000)); + EXPECT_EQ(5, buffered_->buffer_size()); + } +} + class TestBufferedInputStreamBound : public ::testing::Test { public: void SetUp() { CreateExample(/*bounded=*/true); }