Skip to content

Commit

Permalink
chore: simplify upload part
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Dec 28, 2023
1 parent 0d90ed6 commit 0b5544c
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 45 deletions.
39 changes: 0 additions & 39 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,6 @@ pub(crate) struct UploadPart {
pub(crate) file_metas: Vec<FileMeta>,
/// Target storage of SSTs.
storage: Option<String>,
/// Sender to send notify.
request_sender: Option<Sender<WorkerRequest>>,
/// The last entry id has been flushed.
flushed_entry_id: Option<EntryId>,
/// The last sequence has been flushed.
flushed_sequence: Option<SequenceNumber>,
}

/// Writer to build a upload part.
Expand All @@ -101,12 +95,6 @@ pub(crate) struct UploadPartWriter {
file_metas: Vec<FileMeta>,
/// Target storage of SSTs.
storage: Option<String>,
/// Sender to send notify.
request_sender: Option<Sender<WorkerRequest>>,
/// The last entry id has been flushed.
flushed_entry_id: Option<EntryId>,
/// The last sequence has been flushed.
flushed_sequence: Option<SequenceNumber>,
}

impl UploadPartWriter {
Expand All @@ -118,9 +106,6 @@ impl UploadPartWriter {
region_dir: String::new(),
file_metas: Vec::new(),
storage: None,
request_sender: None,
flushed_entry_id: None,
flushed_sequence: None,
}
}

Expand All @@ -138,27 +123,6 @@ impl UploadPartWriter {
self
}

/// Sets request sender for the part.
#[must_use]
pub(crate) fn with_request_sender(mut self, sender: Option<Sender<WorkerRequest>>) -> Self {
self.request_sender = sender;
self
}

/// Sets flushed entry id.
#[must_use]
pub(crate) fn with_flushed_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.flushed_entry_id = entry_id;
self
}

/// Sets flushed sequence.
#[must_use]
pub(crate) fn with_flushed_sequence(mut self, sequence: Option<SequenceNumber>) -> Self {
self.flushed_sequence = sequence;
self
}

/// Reserve capacity for `additional` files.
pub(crate) fn reserve_capacity(&mut self, additional: usize) {
self.file_metas.reserve(additional);
Expand Down Expand Up @@ -192,9 +156,6 @@ impl UploadPartWriter {
region_dir: self.region_dir,
file_metas: self.file_metas,
storage: self.storage,
request_sender: self.request_sender,
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
}
}
}
3 changes: 1 addition & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ impl TwcsCompactionTask {
let mut part_writer = self
.sst_layer
.upload_part_writer(self.metadata.clone(), &self.cache_manager)
.with_storage(self.storage.clone())
.with_request_sender(Some(self.request_sender.clone()));
.with_storage(self.storage.clone());
for output in self.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));

Expand Down
5 changes: 1 addition & 4 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,7 @@ impl RegionFlushTask {
let mut writer = self
.access_layer
.upload_part_writer(version.metadata.clone(), &self.cache_manager)
.with_storage(version.options.storage.clone())
.with_request_sender(Some(self.request_sender.clone()))
.with_flushed_entry_id(Some(version_data.last_entry_id))
.with_flushed_sequence(Some(version_data.committed_sequence));
.with_storage(version.options.storage.clone());

let worker_request = match self
.flush_memtables(&version_data.version, &mut writer)
Expand Down

0 comments on commit 0b5544c

Please sign in to comment.