Skip to content

Commit

Permalink
Some DB cleanups
Browse files Browse the repository at this point in the history
Hold a lock on commits while one is in progress, but don't lock the
relations (use ArcSwap)
Fix shutdown of DB & db restore test
Make sure Transaction is !Send & !Sync
  • Loading branch information
rdaum committed Jan 26, 2024
1 parent dfb89a5 commit 778b1b9
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 65 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ uuid = { version = "1.7.0", features = ["v4"] }
yoke = "0.7.3"
yoke-derive = "0.7.3"
dashmap = "5.5.3"
arc-swap = "1.6.0"

## Required for MOO builtins.
pwhash = "1.0.0" # For MOO's hokey "crypt" function, which is unix's crypt(3) basically
Expand Down
1 change: 1 addition & 0 deletions crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tokio-eventfd.workspace = true
fast-counter.workspace = true
num-traits.workspace = true
kanal.workspace = true
arc-swap.workspace = true

# For testing & benching common bits
serde_json.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions crates/db/src/odb/rb_worldstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ impl RelBoxWorldState {

// Check the db for sys (#0) object to see if this is a fresh DB or not.
let fresh_db = {
let rels = db.canonical.read().unwrap();
rels[WorldStateRelation::ObjectParent as usize]
db.canonical[WorldStateRelation::ObjectParent as usize]
.load()
.seek_by_domain(SYSTEM_OBJECT.0.as_sliceref())
.is_none()
};
Expand Down
12 changes: 10 additions & 2 deletions crates/db/src/rdb/backing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
//! storage mechanism is desired.
use kanal::Sender;
use std::thread::yield_now;

use crate::rdb::tx::WorkingSet;

pub struct BackingStoreClient {
sender: Sender<WriterMessage>,
join_handle: std::thread::JoinHandle<()>,
}

pub enum WriterMessage {
Expand All @@ -30,8 +32,11 @@ pub enum WriterMessage {
}

impl BackingStoreClient {
pub fn new(sender: Sender<WriterMessage>) -> Self {
Self { sender }
pub fn new(sender: Sender<WriterMessage>, join_handle: std::thread::JoinHandle<()>) -> Self {
Self {
sender,
join_handle,
}
}

/// Sync out the working set from a committed transaction for the given transaction timestamp.
Expand All @@ -48,5 +53,8 @@ impl BackingStoreClient {
self.sender
.send(WriterMessage::Shutdown)
.expect("Unable to send shutdown message");
while !self.join_handle.is_finished() {
yield_now()
}
}
}
4 changes: 2 additions & 2 deletions crates/db/src/rdb/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ impl ColdStorage {
// Start the listen loop
let (writer_send, writer_receive) = kanal::unbounded();
let ps = page_storage.clone();
std::thread::Builder::new()
let cs_join = std::thread::Builder::new()
.name("moor-coldstorage-listen".to_string())
.spawn(move || Self::listen_loop(writer_receive, wal, tuple_box.clone(), ps))
.expect("Unable to spawn coldstorage listen thread");

// And return the client to it.
BackingStoreClient::new(writer_send)
BackingStoreClient::new(writer_send, cs_join)
}

fn listen_loop(
Expand Down
12 changes: 6 additions & 6 deletions crates/db/src/rdb/page_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ fn make_eventfd() -> Fd {
fn event_fd_listen_thread(event_fd: &Fd, ps: Arc<PageStore>, running_flag: Arc<Mutex<bool>>) {
info!("Listening for eventfd events for page storage");
loop {
let rf = running_flag.clone();
let running = rf.lock().unwrap();
if !*running {
break;
{
let rf = running_flag.clone();
let running = rf.lock().unwrap();
if !*running {
break;
}
}
let mut eventfd_v: libc::eventfd_t = 0;

Expand Down Expand Up @@ -121,8 +123,6 @@ impl PageStore {
buffers: Default::default(),
};



Arc::new(Self {
dir,
next_request_id: AtomicU64::new(0),
Expand Down
72 changes: 48 additions & 24 deletions crates/db/src/rdb/relbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// TODO: support sorted indices, too.
// TODO: 'join' and transitive closure -> datalog-style variable unification

use arc_swap::ArcSwap;
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::{Arc, Mutex, MutexGuard};

use tracing::info;

Expand Down Expand Up @@ -57,14 +58,28 @@ pub struct RelBox {
/// Monotonically incrementing sequence counters.
sequences: Vec<AtomicU64>,
/// The copy-on-write set of current canonical base relations.
// TODO: this is a candidate for an optimistic lock.
pub(crate) canonical: RwLock<Vec<BaseRelation>>,
/// Held in ArcSwap so that we can swap them out atomically for their modified versions, without holding
/// a lock for reads.
pub(crate) canonical: Vec<ArcSwap<BaseRelation>>,

/// Lock to hold while committing a transaction.
commit_lock: Mutex<()>,

slotbox: Arc<TupleBox>,

backing_store: Option<BackingStoreClient>,
}

impl Debug for RelBox {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RelBox")
.field("num_relations", &self.relation_info.len())
.field("maximum_transaction", &self.maximum_transaction)
.field("sequences", &self.sequences)
.finish()
}
}

impl RelBox {
pub fn new(
memory_size: usize,
Expand Down Expand Up @@ -104,10 +119,14 @@ impl RelBox {
Arc::new(Self {
relation_info: relations.to_vec(),
maximum_transaction: AtomicU64::new(0),
canonical: RwLock::new(base_relations),
canonical: base_relations
.into_iter()
.map(|b| ArcSwap::new(Arc::new(b)))
.collect(),
sequences,
backing_store,
slotbox,
commit_lock: Mutex::new(()),
})
}

Expand Down Expand Up @@ -161,10 +180,10 @@ impl RelBox {
pub fn with_relation<R, F: Fn(&BaseRelation) -> R>(&self, relation_id: RelationId, f: F) -> R {
f(self
.canonical
.read()
.unwrap()
.get(relation_id.0)
.expect("No such relation"))
.expect("No such relation")
.load()
.as_ref())
}

/// Prepare a commit set for the given transaction. This will scan through the transaction's
Expand All @@ -174,15 +193,16 @@ impl RelBox {
&self,
commit_ts: u64,
tx_working_set: &mut WorkingSet,
) -> Result<CommitSet, CommitError> {
) -> Result<(CommitSet, MutexGuard<()>), CommitError> {
let mut commitset = CommitSet::new(commit_ts);
let commit_guard = self.commit_lock.lock().unwrap();

for (_, local_relation) in tx_working_set.relations.iter_mut() {
let relation_id = local_relation.id;
// scan through the local working set, and for each tuple, check to see if it's safe to
// commit. If it is, then we'll add it to the commit set.
// note we're not actually committing yet, just producing a candidate commit set
let canonical = &self.canonical.read().unwrap()[relation_id.0];
let canonical = &self.canonical[relation_id.0].load();
for mut tuple in local_relation.tuples_mut() {
// Look for the most recent version for this domain in the canonical relation.
let canon_tuple = canonical.seek_by_domain(tuple.domain().clone());
Expand Down Expand Up @@ -246,36 +266,40 @@ impl RelBox {
}
}
}
Ok(commitset)
Ok((commitset, commit_guard))
}

/// Actually commit a transaction's (approved) CommitSet. If the underlying base relations have
/// changed since the transaction started, this will return `Err(RelationContentionConflict)`
/// changed since the commit set began, this will return `Err(RelationContentionConflict)`
/// and the transaction can choose to try to produce a new CommitSet, or just abort.
pub(crate) fn try_commit(&self, commit_set: CommitSet) -> Result<(), CommitError> {
// swap the active canonical state with the new one. but only if the timestamps have not
// changed in the interim; we have to hold a lock while this is done. If any relations have
// had their ts change, we need to retry.
// We have to hold a lock during the duration of this. If we fail, we will loop back
// and retry.
let mut canonical = self.canonical.write().unwrap();
pub(crate) fn try_commit(
&self,
commit_set: CommitSet,
_commit_guard: MutexGuard<()>,
) -> Result<(), CommitError> {
// swap the active canonical state with the new one
// but only if the timestamps have not changed in the interim;
// we have to hold a lock while this is checked. If any relations have
// had their ts change, we probably need to retry because someone else got there first
// This shouldn't happen if we have a commit guard.
for (_, relation) in commit_set.iter() {
// Did the relation get committed to by someone else in the interim? If so, return
// back to the transaction letting it know that, and it can decide if it wants to
// retry.
if relation.ts != canonical[relation.id.0].ts {
// This shouldn't happen if we have a commit guard.
if relation.ts != self.canonical[relation.id.0].load().ts {
return Err(CommitError::RelationContentionConflict);
}
}

// Everything passed, so we can commit the changes by swapping in the new canonical
// before releasing the lock.
let commit_ts = commit_set.ts;
for (_, relation) in commit_set.into_iter() {
for (_, mut relation) in commit_set.into_iter() {
let idx = relation.id.0;
canonical[idx] = relation;
// And update the timestamp on the canonical relation.
canonical[idx].ts = commit_ts;

relation.ts = commit_ts;
self.canonical[idx].store(Arc::new(relation));
}

Ok(())
Expand Down
26 changes: 17 additions & 9 deletions crates/db/src/rdb/tx/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// this program. If not, see <https://www.gnu.org/licenses/>.
//

use std::cell::RefCell;
use std::cell::{Cell, RefCell};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use std::marker::PhantomData;
use std::sync::{Arc, MutexGuard};
use std::thread::yield_now;

use thiserror::Error;

Expand All @@ -30,6 +31,9 @@ use crate::rdb::tx::relvar::RelVar;
use crate::rdb::tx::working_set::WorkingSet;
use crate::rdb::RelationId;

type PhantomUnsync = PhantomData<Cell<()>>;
type PhantomUnsend = PhantomData<MutexGuard<'static, ()>>;

/// A versioned transaction, which is a fork of the current canonical base relations.
pub struct Transaction {
/// Where we came from, for referencing back to the base relations.
Expand All @@ -38,6 +42,9 @@ pub struct Transaction {
/// to the transaction, and represents the set of values that will be committed to the base
/// relations at commit time.
pub(crate) working_set: RefCell<Option<WorkingSet>>,

unsend: PhantomUnsend,
unsync: PhantomUnsync,
}

/// Errors which can occur during a commit.
Expand All @@ -59,6 +66,8 @@ impl Transaction {
Self {
db,
working_set: RefCell::new(Some(ws)),
unsend: Default::default(),
unsync: Default::default(),
}
}

Expand All @@ -78,11 +87,11 @@ impl Transaction {
tries += 1;
let commit_ts = self.db.clone().next_ts();
let mut working_set = self.working_set.borrow_mut();
let commit_set = self
let (commit_set, commit_guard) = self
.db
.prepare_commit_set(commit_ts, working_set.as_mut().unwrap())?;
match self.db.try_commit(commit_set) {
Ok(_) => {
match self.db.try_commit(commit_set, commit_guard) {
Ok(()) => {
let working_set = working_set.take().unwrap();
self.db.sync(commit_ts, working_set);
return Ok(());
Expand All @@ -92,8 +101,7 @@ impl Transaction {
return Err(CommitError::RelationContentionConflict);
} else {
// Release the lock, pause a bit, and retry the commit set.
drop(working_set);
std::thread::sleep(Duration::from_millis(5));
yield_now();
continue 'retry;
}
}
Expand Down Expand Up @@ -310,7 +318,7 @@ mod tests {

// Verify canonical state.
{
let relation = &db.canonical.read().unwrap()[0];
let relation = &db.canonical[0].load();
let tuple = relation
.seek_by_domain(attr(b"abc"))
.expect("Expected tuple to exist");
Expand Down
Loading

0 comments on commit 778b1b9

Please sign in to comment.