Skip to content

Commit

Permalink
chore: ensure safety condition of unsafe blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 23, 2024
1 parent 47571a9 commit 196b579
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 49 deletions.
4 changes: 2 additions & 2 deletions src/arrows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ where
let key = match range {
Bound::Included(key) => {
cmp = &gt_eq;
Some(unsafe { &*(key as *const _) })
Some(&*(key as *const _))
}
Bound::Excluded(key) => {
cmp = >
Some(unsafe { &*(key as *const _) })
Some(&*(key as *const _))
}
Bound::Unbounded => {
cmp = &|this, _| {
Expand Down
5 changes: 4 additions & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ where
version_edits: &mut Vec<VersionEdit<<R as Record>::Key>>,
level: usize,
streams: Vec<ScanStream<'scan, R, FP>>,
) -> Result<(), CompactionError<R>> {
) -> Result<(), CompactionError<R>>
where
FP: 'scan,
{
let mut stream = MergeStream::<R, FP>::from_vec(streams).await?;

// Kould: is the capacity parameter necessary?
Expand Down
18 changes: 9 additions & 9 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use super::mutable::Mutable;
use crate::{
record::{internal::InternalRecordRef, Key, Record, RecordRef},
stream::record_batch::RecordBatchEntry,
timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
},
timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH},
};

pub trait ArrowArrays: Sized {
Expand Down Expand Up @@ -154,10 +151,13 @@ where
self.range.next().map(|(_, &offset)| {
let record_ref = R::Ref::from_record_batch(self.record_batch, offset as usize);
// TODO: remove cloning record batch
RecordBatchEntry::new(self.record_batch.clone(), unsafe {
transmute::<InternalRecordRef<R::Ref<'_>>, InternalRecordRef<R::Ref<'static>>>(
record_ref,
)
RecordBatchEntry::new(self.record_batch.clone(), {
// Safety: record_ref self-references the record batch
unsafe {
transmute::<InternalRecordRef<R::Ref<'_>>, InternalRecordRef<R::Ref<'static>>>(
record_ref,
)
}
})
})
}
Expand All @@ -179,7 +179,7 @@ pub(crate) mod tests {
use crate::{
record::Record,
tests::{Test, TestRef},
timestamp::Timestamped,
timestamp::timestamped::Timestamped,
};

#[derive(Debug)]
Expand Down
14 changes: 10 additions & 4 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
Expand All @@ -16,26 +17,31 @@ use crate::{

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R, FP>
pub struct SsTableScan<'scan, R, FP>
where
FP: FileProvider,
{
#[pin]
stream: ParquetRecordBatchStream<Compat<FP::File>>,
iter: Option<RecordBatchIterator<R>>,
_marker: PhantomData<&'scan ()>
}
}

impl<R, FP> SsTableScan<R, FP>
impl<R, FP> SsTableScan<'_, R, FP>
where
FP: FileProvider,
{
pub fn new(stream: ParquetRecordBatchStream<Compat<FP::File>>) -> Self {
SsTableScan { stream, iter: None }
SsTableScan {
stream,
iter: None,
_marker: PhantomData,
}
}
}

impl<R, FP> Stream for SsTableScan<R, FP>
impl<'scan, R, FP> Stream for SsTableScan<'scan, R, FP>
where
R: Record,
FP: FileProvider,
Expand Down
3 changes: 3 additions & 0 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ where
let builder = self.into_parquet_builder(limit).await?;

let schema_descriptor = builder.metadata().file_metadata().schema_descr();

// Safety: filter's lifetime relies on range's lifetime, sstable must not live longer than
// it
let filter = unsafe { get_range_filter::<R>(schema_descriptor, range, ts) };

Ok(SsTableScan::new(builder.with_row_filter(filter).build()?))
Expand Down
6 changes: 4 additions & 2 deletions src/stream/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ where
FP: FileProvider,
{
Init(FileId),
Ready(SsTableScan<R, FP>),
Ready(SsTableScan<'level, R, FP>),
OpenFile(Pin<Box<dyn Future<Output = io::Result<FP::File>> + 'level>>),
LoadStream(Pin<Box<dyn Future<Output = Result<SsTableScan<R, FP>, ParquetError>> + 'level>>),
LoadStream(
Pin<Box<dyn Future<Output = Result<SsTableScan<'level, R, FP>, ParquetError>> + 'level>>,
),
}

pub(crate) struct LevelStream<'level, R, FP>
Expand Down
13 changes: 7 additions & 6 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ where
{
pub(crate) fn key(&self) -> Timestamped<<R::Key as Key>::Ref<'_>> {
match self {
Entry::Mutable(entry) => entry
.key()
.map(|key| unsafe { transmute(key.as_key_ref()) }),
Entry::Mutable(entry) => entry.key().map(|key| {
// Safety: shorter lifetime must be safe
unsafe { transmute(key.as_key_ref()) }
}),
Entry::SsTable(entry) => entry.internal_key(),
Entry::Immutable(entry) => entry.internal_key(),
Entry::Level(entry) => entry.internal_key(),
Expand Down Expand Up @@ -95,7 +96,7 @@ pin_project! {
},
SsTable {
#[pin]
inner: SsTableScan<R, FP>,
inner: SsTableScan<'scan, R, FP>,
},
Level {
#[pin]
Expand Down Expand Up @@ -128,12 +129,12 @@ where
}
}

impl<'scan, R, FP> From<SsTableScan<R, FP>> for ScanStream<'scan, R, FP>
impl<'scan, R, FP> From<SsTableScan<'scan, R, FP>> for ScanStream<'scan, R, FP>
where
R: Record,
FP: FileProvider,
{
fn from(inner: SsTableScan<R, FP>) -> Self {
fn from(inner: SsTableScan<'scan, R, FP>) -> Self {
ScanStream::SsTable { inner }
}
}
Expand Down
32 changes: 7 additions & 25 deletions src/timestamp/timestamped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,7 @@ where
V: PartialEq,
{
fn eq(&self, other: &Self) -> bool {
unsafe {
let this = transmute::<&TimestampedRef<V>, [usize; 2]>(self);
let other = transmute::<&TimestampedRef<V>, [usize; 2]>(other);
let this_value = transmute::<usize, &V>(this[0]);
let other_value = transmute::<usize, &V>(other[0]);
this_value == other_value && this[1] == other[1]
}
self.value() == other.value() && self.ts() == other.ts()
}
}

Expand All @@ -121,15 +115,9 @@ where
V: PartialOrd,
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
unsafe {
let this = transmute::<&TimestampedRef<V>, [usize; 2]>(self);
let other = transmute::<&TimestampedRef<V>, [usize; 2]>(other);
let this_value = transmute::<usize, &V>(this[0]);
let other_value = transmute::<usize, &V>(other[0]);
this_value
.partial_cmp(other_value)
.map(|ordering| ordering.then_with(|| other[1].cmp(&this[1])))
}
self.value()
.partial_cmp(other.value())
.map(|ordering| ordering.then_with(|| other.ts().cmp(&self.ts())))
}
}

Expand All @@ -138,15 +126,9 @@ where
K: Ord,
{
fn cmp(&self, other: &Self) -> Ordering {
unsafe {
let this = transmute::<&TimestampedRef<K>, [usize; 2]>(self);
let other = transmute::<&TimestampedRef<K>, [usize; 2]>(other);
let this_value = transmute::<usize, &K>(this[0]);
let other_value = transmute::<usize, &K>(other[0]);
this_value
.cmp(other_value)
.then_with(|| other[1].cmp(&this[1]))
}
self.value()
.cmp(other.value())
.then_with(|| other.ts().cmp(&self.ts()))
}
}

Expand Down

0 comments on commit 196b579

Please sign in to comment.