Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some update for JSegcache #452

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions src/storage/seg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use thiserror::Error;
#[derive(Error, Debug, PartialEq, Eq, Copy, Clone)]
/// Possible errors returned by the top-level API
pub enum SegError {
#[error("key size too large")]
KeySizeTooLargeEx,
#[error("hashtable insert exception")]
HashTableInsertEx,
#[error("eviction exception")]
Expand Down
99 changes: 99 additions & 0 deletions src/storage/seg/src/hashtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,10 @@ impl HashTable {
*item_info = (*item_info & !FREQ_MASK) | freq;
}

let age = segments.get_age(*item_info).unwrap();
let item = Item::new(
current_item,
age,
get_cas(self.data[(hash & self.mask) as usize].data[0]),
);
item.check_magic();
Expand All @@ -322,6 +324,101 @@ impl HashTable {
None
}

/// Lookup an item by key and return it
pub fn get_age(&mut self, key: &[u8], segments: &mut Segments) -> Option<u32> {
let hash = self.hash(key);
let tag = tag_from_hash(hash);

let iter = IterMut::new(self, hash);

for item_info in iter {
if get_tag(*item_info) == tag {
let current_item = segments.get_item(*item_info).unwrap();
if current_item.key() != key {
HASH_TAG_COLLISION.increment();
} else {
return segments.get_age(*item_info);
}
}
}

None
}

/// Lookup an item by key and return it
/// compare to get, this is designed to support multiple readers and single writer.
/// because eviction always remove hashtable entry first,
/// so if an object is evicted, its hash table entry must have been removed,
/// as a result, we can verify hash table entry after reading/copying the value.
///
/// Therefore, we can leverage opportunistic concurrency control to support
/// multiple readers and a single writer.
/// we check the hash table after a reader reads the data,
/// if the data is evicted, then its hash table entry must have been removed.
///
pub fn get_with_item_info(
&mut self,
key: &[u8],
time: Instant,
segments: &mut Segments,
) -> Option<RichItem> {
let hash = self.hash(key);
let tag = tag_from_hash(hash);
let bucket_id = hash & self.mask;

let bucket_info = self.data[bucket_id as usize].data[0];

let curr_ts = (time - self.started).as_secs() & PROC_TS_MASK;

if curr_ts != get_ts(bucket_info) as u32 {
self.data[bucket_id as usize].data[0] = (bucket_info & !TS_MASK) | (curr_ts as u64);

let iter = IterMut::new(self, hash);
for item_info in iter {
*item_info &= CLEAR_FREQ_SMOOTH_MASK;
}
}

let cas = get_cas(self.data[(hash & self.mask) as usize].data[0]);
let iter = IterMut::new(self, hash);

for item_info in iter {
let item_info_val = *item_info;
if get_tag(item_info_val) == tag {
let current_item = segments.get_item(*item_info).unwrap();
if current_item.key() != key {
HASH_TAG_COLLISION.increment();
} else {
// update item frequency
let mut freq = get_freq(*item_info);
if freq < 127 {
let rand = thread_rng().gen::<u64>();
if freq <= 16 || rand % freq == 0 {
freq = ((freq + 1) | 0x80) << FREQ_BIT_SHIFT;
} else {
freq = (freq | 0x80) << FREQ_BIT_SHIFT;
}
*item_info = (*item_info & !FREQ_MASK) | freq;
}

let age = segments.get_age(item_info_val).unwrap();
let item = RichItem::new(
current_item,
age,
cas,
item_info_val & !FREQ_MASK,
item_info
);
item.check_magic();

return Some(item);
}
}
}

None
}

/// Lookup an item by key and return it without incrementing the item
/// frequency. This may be used to compose higher-level functions which do
/// not want a successful item lookup to count as a hit for that item.
Expand All @@ -338,8 +435,10 @@ impl HashTable {
if current_item.key() != key {
HASH_TAG_COLLISION.increment();
} else {
let age = segments.get_age(*item_info).unwrap();
let item = Item::new(
current_item,
age,
get_cas(self.data[(hash & self.mask) as usize].data[0]),
);
item.check_magic();
Expand Down
108 changes: 106 additions & 2 deletions src/storage/seg/src/item/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod reserved;
#[cfg(any(feature = "magic", feature = "debug"))]
pub(crate) use header::ITEM_MAGIC_SIZE;

use crate::hashtable::FREQ_MASK;
use crate::SegError;
use crate::Value;

Expand All @@ -21,13 +22,14 @@ pub(crate) use reserved::ReservedItem;
/// Items are the base unit of data stored within the cache.
pub struct Item {
cas: u32,
age: u32,
raw: RawItem,
}

impl Item {
/// Creates a new `Item` from its parts
pub(crate) fn new(raw: RawItem, cas: u32) -> Self {
Item { cas, raw }
pub(crate) fn new(raw: RawItem, age: u32, cas: u32) -> Self {
Item { cas, age, raw }
}

/// If the `magic` or `debug` features are enabled, this allows for checking
Expand Down Expand Up @@ -56,6 +58,10 @@ impl Item {
self.cas
}

pub fn age(&self) -> u32 {
self.age
}

/// Borrow the optional data
pub fn optional(&self) -> Option<&[u8]> {
self.raw.optional()
Expand Down Expand Up @@ -83,6 +89,104 @@ impl std::fmt::Debug for Item {
}
}

/// Items are the base unit of data stored within the cache.
pub struct RichItem {
Copy link
Contributor

@brayniac brayniac Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this can't be:

pub struct RichItem {
  item: Item,
  item_info: u64,
  item_info_ptr: *const u64,
}

It seems like re-using the Item type and having the wrapper type contain the additional fields for "rich" functionality would be a better way to go here

Copy link
Author

@JunchengYTwitter JunchengYTwitter Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, your proposal is better, I will change it :)

item: Item,
item_info: u64,
item_info_ptr: *const u64,
}

impl RichItem {
/// Creates a new `Item` from its parts
pub(crate) fn new(
raw: RawItem,
age: u32,
cas: u32,
item_info: u64,
item_info_ptr: *const u64,
) -> Self {
let item = Item::new(raw, age, cas);
RichItem {
item,
item_info,
item_info_ptr,
}
}

/// If the `magic` or `debug` features are enabled, this allows for checking
/// that the magic bytes at the start of an item match the expected value.
///
/// # Panics
///
/// Panics if the magic bytes are incorrect, indicating that the data has
/// become corrupted or the item was loaded from the wrong offset.
pub(crate) fn check_magic(&self) {
self.item.raw.check_magic()
}

/// Borrow the item key
pub fn key(&self) -> &[u8] {
self.item.raw.key()
}

/// Borrow the item value
pub fn value(&self) -> Value {
self.item.raw.value()
}

/// CAS value for the item
pub fn cas(&self) -> u32 {
self.item.cas
}

pub fn age(&self) -> u32 {
self.item.age
}

pub fn item(&self) -> &Item {
&self.item
}

pub fn item_mut(&mut self) -> &mut Item {
&mut self.item
}

// used to support multi readers and single writer
// return true, if the item is evicted/updated since being
// read from the hash table
pub fn is_not_changed(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't love the is_not_ here - can this just be is_current()?

unsafe { return self.item_info == *self.item_info_ptr & !FREQ_MASK }
}

/// Borrow the optional data
pub fn optional(&self) -> Option<&[u8]> {
self.item.raw.optional()
}

/// Perform a wrapping addition on the value. Returns an error if the item
/// is not a numeric type.
pub fn wrapping_add(&mut self, rhs: u64) -> Result<(), SegError> {
self.item.raw.wrapping_add(rhs)
}

/// Perform a saturating subtraction on the value. Returns an error if the
/// item is not a numeric type.
pub fn saturating_sub(&mut self, rhs: u64) -> Result<(), SegError> {
self.item.raw.saturating_sub(rhs)
}
}

impl std::fmt::Debug for RichItem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
f.debug_struct("Item")
.field("cas", &self.cas())
.field("raw", &self.item.raw)
.field("item_info", &self.item_info)
.field("item_info_ptr", &self.item_info_ptr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this print the ptr address? Is that really useful?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is not useful, printing the value itself is not very useful as well. I can print all three (two value, one pointer) or we can remove the print. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like printing if it's stale or not would maybe have value here?

.finish()
}
}

pub fn size_of(value: &Value) -> usize {
match value {
Value::Bytes(v) => v.len(),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/seg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use crate::seg::Seg;
pub use builder::Builder;
pub use error::SegError;
pub use eviction::Policy;
pub use item::Item;
pub use item::{Item, RichItem};

// publicly exported items from external crates
pub use storage_types::Value;
Expand Down
55 changes: 55 additions & 0 deletions src/storage/seg/src/seg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,50 @@ impl Seg {
self.hashtable.get(key, self.time, &mut self.segments)
}

/// Get the age of the item in the `Seg` with the provided key
///
/// ```
/// use seg::{Policy, Seg};
/// use std::time::Duration;
///
/// let mut cache = Seg::builder().build().expect("failed to create cache");
/// assert!(cache.get(b"coffee").is_none());
///
/// cache.insert(b"coffee", b"strong", None, Duration::ZERO);
/// let age = cache.get_age(b"coffee").expect("didn't get item back");
/// assert_eq!(age, 0);
/// ```
pub fn get_age(&mut self, key: &[u8]) -> Option<u32> {
self.hashtable.get_age(key, &mut self.segments)
}

/// Get the item in the `Seg` with the provided key
/// this differs from get by returning information about hash table entry
/// this allows opportunistic concurrency control and enables
/// multiple readers and a single writer
/// To use it, one simply checks the hash table entry does not change after
/// copying/using the item value, if it has changed, it means the item
/// is evicted or updated by another thread and we need to roll back
///
/// ```
/// use seg::{Policy, Seg};
/// use std::time::Duration;
///
/// let mut cache = Seg::builder().build().expect("failed to create cache");
/// assert!(cache.get(b"coffee").is_none());
///
/// cache.insert(b"coffee", b"strong", None, Duration::ZERO);
/// let item = cache.get_with_item_info(b"coffee").expect("didn't get item back");
/// assert_eq!(item.value(), b"strong");
/// assert!(item.is_not_changed());
/// cache.insert(b"coffee", b"notStrong", None, Duration::ZERO);
/// assert!(!item.is_not_changed());
/// ```
pub fn get_with_item_info(&mut self, key: &[u8]) -> Option<RichItem> {
self.hashtable
.get_with_item_info(key, self.time, &mut self.segments)
}

/// Get the item in the `Seg` with the provided key without
/// increasing the item frequency - useful for combined operations that
/// check for presence - eg replace is a get + set
Expand Down Expand Up @@ -123,6 +167,10 @@ impl Seg {
) -> Result<(), SegError> {
let value: Value = value.into();

if key.len() > 255 {
return Err(SegError::KeySizeTooLargeEx);
}

// default optional data is empty
let optional = optional.unwrap_or(&[]);

Expand All @@ -133,6 +181,8 @@ impl Seg {

// try to get a `ReservedItem`
let mut retries = RESERVE_RETRIES;
let mut has_removed_expired = false;

let reserved;
loop {
match self
Expand All @@ -149,6 +199,11 @@ impl Seg {
return Err(SegError::ItemOversized { size });
}
Err(TtlBucketsError::NoFreeSegments) => {
if !has_removed_expired {
self.expire();
has_removed_expired = true;
continue;
}
if self
.segments
.evict(&mut self.ttl_buckets, &mut self.hashtable)
Expand Down
2 changes: 1 addition & 1 deletion src/storage/seg/src/segments/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::*;

// the minimum age of a segment before it is eligible for eviction
// TODO(bmartin): this should be parameterized.
const SEG_MATURE_TIME: Duration = Duration::from_secs(20);
const SEG_MATURE_TIME: Duration = Duration::from_secs(0);

#[derive(Debug)]
#[repr(C)]
Expand Down
10 changes: 10 additions & 0 deletions src/storage/seg/src/segments/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ impl Segments {
self.get_item_at(seg_id, offset)
}

pub(crate) fn get_age(&self, item_info: u64) -> Option<u32> {
let seg_id = get_seg_id(item_info).map(|v| v.get())?;
return Some(
self.headers[seg_id as usize - 1]
.create_at()
.elapsed()
.as_secs(),
);
}

/// Retrieve a `RawItem` from a specific segment id at the given offset
// TODO(bmartin): consider changing the return type here and removing asserts?
pub(crate) fn get_item_at(
Expand Down