Skip to content

Commit

Permalink
spawn for data stream
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Dec 7, 2023
1 parent 05970f6 commit 035a879
Show file tree
Hide file tree
Showing 18 changed files with 367 additions and 193 deletions.
55 changes: 39 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ pub mod default {
}

pub fn object_store_read_timeout_ms() -> u64 {
60 * 60 * 1000
300 * 1000
}

pub mod s3 {
Expand Down
1 change: 0 additions & 1 deletion src/compute/src/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ pub fn storage_memory_config(
);

let total_calculated_mb = block_cache_capacity_mb
+ large_query_memory_usage_mb
+ meta_cache_capacity_mb
+ shared_buffer_capacity_mb
+ data_file_cache_ring_buffer_capacity_mb
Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ recent_filter_rotate_interval_ms = 10000
object_store_streaming_read_timeout_ms = 600000
object_store_streaming_upload_timeout_ms = 600000
object_store_upload_timeout_ms = 3600000
object_store_read_timeout_ms = 3600000
object_store_read_timeout_ms = 300000

[storage.object_store.s3]
object_store_keepalive_ms = 600000
Expand Down
13 changes: 7 additions & 6 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,9 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
};

try_update_failure_metric(&self.object_store_metrics, &res, operation_type);
if let Err(e) = &res {
tracing::error!("read failed because of: {:?}", e);
}

let data = res?;
self.object_store_metrics
Expand Down Expand Up @@ -648,12 +651,10 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
.operation_latency
.with_label_values(&[media_type, operation_type])
.start_timer();
let future = async {
self.inner
.streaming_read(path, range)
.verbose_instrument_await("object_store_streaming_read")
.await
};
let future = self
.inner
.streaming_read(path, range)
.verbose_instrument_await("object_store_streaming_read");
let res = match self.streaming_read_timeout.as_ref() {
None => future.await,
Some(timeout) => tokio::time::timeout(*timeout, future)
Expand Down
34 changes: 11 additions & 23 deletions src/storage/src/hummock/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,13 @@ impl BlockDataStream {
pub fn new(
// The stream that provides raw data.
byte_stream: MonitoredStreamingReader,

// Index of the SST's block where the stream starts.
block_index: usize,

// Meta data of the SST that is streamed.
metas: &[BlockMeta],
block_metas: Vec<BlockMeta>,
) -> Self {
// Avoids panicking if `block_index` is too large.
let block_index = std::cmp::min(block_index, metas.len());

Self {
buf_reader: byte_stream,
block_idx: 0,
block_metas: metas[block_index..].to_vec(),
block_metas,
buf: Bytes::default(),
buff_offset: 0,
}
Expand All @@ -93,6 +86,7 @@ impl BlockDataStream {
fail_point!("stream_read_err", |_| Err(HummockError::object_io_error(
ObjectError::internal("stream read error")
)));
let uncompressed_size = block_meta.uncompressed_size as usize;
let end = self.buff_offset + block_meta.len as usize;
let data = if end > self.buf.len() {
let current_block = self.read_next_buf(block_meta.len as usize).await?;
Expand All @@ -104,9 +98,8 @@ impl BlockDataStream {
data
};

let block_idx = self.block_idx;
self.block_idx += 1;
Ok(Some((data, block_idx)))
Ok(Some((data, uncompressed_size)))
}

async fn read_next_buf(&mut self, read_size: usize) -> HummockResult<Bytes> {
Expand Down Expand Up @@ -143,10 +136,9 @@ impl BlockDataStream {
pub async fn next_block(&mut self) -> HummockResult<Option<Box<Block>>> {
match self.next_block_impl().await? {
None => Ok(None),
Some((buf, block_idx)) => Ok(Some(Box::new(Block::decode(
buf,
self.block_metas[block_idx].uncompressed_size as usize,
)?))),
Some((buf, uncompressed_size)) => {
Ok(Some(Box::new(Block::decode(buf, uncompressed_size)?)))
}
}
}
}
Expand Down Expand Up @@ -191,14 +183,10 @@ impl BlockStream for LongConnectionBlockStream {
}

async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>> {
let (block, index) = match self.inner.next_block_impl().await? {
Some((buf, index)) => {
let block = Box::new(Block::decode_with_copy(
buf,
self.inner.block_metas[index].uncompressed_size as usize,
true,
)?);
(block, index + self.start_block_index)
let index = self.next_block_index();
let block = match self.inner.next_block_impl().await? {
Some((buf, uncompressed_size)) => {
Box::new(Block::decode_with_copy(buf, uncompressed_size, true)?)
}
None => return Ok(None),
};
Expand Down
7 changes: 1 addition & 6 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ impl BlockStreamIterator {
}
}

pub fn is_first_block(&self) -> bool {
self.next_block_index == 0
}

/// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
async fn download_next_block(&mut self) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
let (data, _) = match self.block_stream.next_block_impl().await? {
Expand Down Expand Up @@ -264,7 +260,7 @@ impl ConcatSstableIterator {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let block_stream = self
.sstable_store
.get_stream_by_position(sstable.value().id, 0, &sstable.value().meta.block_metas)
.get_stream_by_position(sstable.value().id, &sstable.value().meta.block_metas)
.verbose_instrument_await("stream_iter_get_stream")
.await?;

Expand Down Expand Up @@ -334,7 +330,6 @@ impl CompactorRunner {
builder_factory,
context.compactor_metrics.clone(),
Some(task_progress.clone()),
task_config.is_target_l0_or_lbase,
task_config.table_vnode_partition.clone(),
);
assert_eq!(
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ impl SstableStreamIterator {
.sstable_store
.get_stream_by_position(
self.sstable_info.object_id,
self.seek_block_idx,
&self.block_metas,
&self.block_metas[self.seek_block_idx..],
)
.verbose_instrument_await("stream_iter_get_stream")
.await?;
Expand Down
1 change: 0 additions & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ impl Compactor {
builder_factory,
self.context.compactor_metrics.clone(),
task_progress.clone(),
self.task_config.is_target_l0_or_lbase,
self.task_config.table_vnode_partition.clone(),
);
let compaction_statistics = compact_and_build_sst(
Expand Down
Loading

0 comments on commit 035a879

Please sign in to comment.