From 778b1b95ad1445998f734ecce81fbe0174b74a62 Mon Sep 17 00:00:00 2001 From: Ryan Daum Date: Thu, 25 Jan 2024 19:55:26 -0500 Subject: [PATCH] Some DB cleanups Hold a lock on commits while one is in progress, but don't lock the relations (use ArcSwap) Fix shutdown of DB & db restore test Make sure Transaction is !Send & !Sync --- Cargo.lock | 7 +++ Cargo.toml | 1 + crates/db/Cargo.toml | 1 + crates/db/src/odb/rb_worldstate.rs | 4 +- crates/db/src/rdb/backing.rs | 12 ++++- crates/db/src/rdb/cold_storage.rs | 4 +- crates/db/src/rdb/page_storage.rs | 12 ++--- crates/db/src/rdb/relbox.rs | 72 +++++++++++++++++++---------- crates/db/src/rdb/tx/transaction.rs | 26 +++++++---- crates/db/tests/rdb_restore.rs | 33 ++++++------- 10 files changed, 107 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b447f04e..f35f31df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,6 +146,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "async-compression" version = "0.4.4" @@ -1952,6 +1958,7 @@ dependencies = [ name = "moor-db" version = "0.1.0" dependencies = [ + "arc-swap", "atomic-wait", "binary-layout", "bincode", diff --git a/Cargo.toml b/Cargo.toml index b14b89ce..b957b741 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ uuid = { version = "1.7.0", features = ["v4"] } yoke = "0.7.3" yoke-derive = "0.7.3" dashmap = "5.5.3" +arc-swap = "1.6.0" ## Required for MOO builtins. pwhash = "1.0.0" # For MOO's hokey "crypt" function, which is unix's crypt(3) basically diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index 2e6f477f..32e855bf 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -48,6 +48,7 @@ tokio-eventfd.workspace = true fast-counter.workspace = true num-traits.workspace = true kanal.workspace = true +arc-swap.workspace = true # For testing & benching common bits serde_json.workspace = true diff --git a/crates/db/src/odb/rb_worldstate.rs b/crates/db/src/odb/rb_worldstate.rs index 6290fca9..d41d5c08 100644 --- a/crates/db/src/odb/rb_worldstate.rs +++ b/crates/db/src/odb/rb_worldstate.rs @@ -73,8 +73,8 @@ impl RelBoxWorldState { // Check the db for sys (#0) object to see if this is a fresh DB or not. let fresh_db = { - let rels = db.canonical.read().unwrap(); - rels[WorldStateRelation::ObjectParent as usize] + db.canonical[WorldStateRelation::ObjectParent as usize] + .load() .seek_by_domain(SYSTEM_OBJECT.0.as_sliceref()) .is_none() }; diff --git a/crates/db/src/rdb/backing.rs b/crates/db/src/rdb/backing.rs index 381981b1..d677bb02 100644 --- a/crates/db/src/rdb/backing.rs +++ b/crates/db/src/rdb/backing.rs @@ -17,11 +17,13 @@ //! storage mechanism is desired. use kanal::Sender; +use std::thread::yield_now; use crate::rdb::tx::WorkingSet; pub struct BackingStoreClient { sender: Sender, + join_handle: std::thread::JoinHandle<()>, } pub enum WriterMessage { @@ -30,8 +32,11 @@ pub enum WriterMessage { } impl BackingStoreClient { - pub fn new(sender: Sender) -> Self { - Self { sender } + pub fn new(sender: Sender, join_handle: std::thread::JoinHandle<()>) -> Self { + Self { + sender, + join_handle, + } } /// Sync out the working set from a committed transaction for the given transaction timestamp. @@ -48,5 +53,8 @@ impl BackingStoreClient { self.sender .send(WriterMessage::Shutdown) .expect("Unable to send shutdown message"); + while !self.join_handle.is_finished() { + yield_now() + } } } diff --git a/crates/db/src/rdb/cold_storage.rs b/crates/db/src/rdb/cold_storage.rs index 3741f9ae..e1242f0c 100644 --- a/crates/db/src/rdb/cold_storage.rs +++ b/crates/db/src/rdb/cold_storage.rs @@ -139,13 +139,13 @@ impl ColdStorage { // Start the listen loop let (writer_send, writer_receive) = kanal::unbounded(); let ps = page_storage.clone(); - std::thread::Builder::new() + let cs_join = std::thread::Builder::new() .name("moor-coldstorage-listen".to_string()) .spawn(move || Self::listen_loop(writer_receive, wal, tuple_box.clone(), ps)) .expect("Unable to spawn coldstorage listen thread"); // And return the client to it. - BackingStoreClient::new(writer_send) + BackingStoreClient::new(writer_send, cs_join) } fn listen_loop( diff --git a/crates/db/src/rdb/page_storage.rs b/crates/db/src/rdb/page_storage.rs index 4e392c56..fe57e496 100644 --- a/crates/db/src/rdb/page_storage.rs +++ b/crates/db/src/rdb/page_storage.rs @@ -80,10 +80,12 @@ fn make_eventfd() -> Fd { fn event_fd_listen_thread(event_fd: &Fd, ps: Arc, running_flag: Arc>) { info!("Listening for eventfd events for page storage"); loop { - let rf = running_flag.clone(); - let running = rf.lock().unwrap(); - if !*running { - break; + { + let rf = running_flag.clone(); + let running = rf.lock().unwrap(); + if !*running { + break; + } } let mut eventfd_v: libc::eventfd_t = 0; @@ -121,8 +123,6 @@ impl PageStore { buffers: Default::default(), }; - - Arc::new(Self { dir, next_request_id: AtomicU64::new(0), diff --git a/crates/db/src/rdb/relbox.rs b/crates/db/src/rdb/relbox.rs index f4cd58c0..b3d99cea 100644 --- a/crates/db/src/rdb/relbox.rs +++ b/crates/db/src/rdb/relbox.rs @@ -15,10 +15,11 @@ // TODO: support sorted indices, too. // TODO: 'join' and transitive closure -> datalog-style variable unification +use arc_swap::ArcSwap; +use std::fmt::Debug; use std::path::PathBuf; use std::sync::atomic::AtomicU64; -use std::sync::Arc; -use std::sync::RwLock; +use std::sync::{Arc, Mutex, MutexGuard}; use tracing::info; @@ -57,14 +58,28 @@ pub struct RelBox { /// Monotonically incrementing sequence counters. sequences: Vec, /// The copy-on-write set of current canonical base relations. - // TODO: this is a candidate for an optimistic lock. - pub(crate) canonical: RwLock>, + /// Held in ArcSwap so that we can swap them out atomically for their modified versions, without holding + /// a lock for reads. + pub(crate) canonical: Vec>, + + /// Lock to hold while committing a transaction. + commit_lock: Mutex<()>, slotbox: Arc, backing_store: Option, } +impl Debug for RelBox { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RelBox") + .field("num_relations", &self.relation_info.len()) + .field("maximum_transaction", &self.maximum_transaction) + .field("sequences", &self.sequences) + .finish() + } +} + impl RelBox { pub fn new( memory_size: usize, @@ -104,10 +119,14 @@ impl RelBox { Arc::new(Self { relation_info: relations.to_vec(), maximum_transaction: AtomicU64::new(0), - canonical: RwLock::new(base_relations), + canonical: base_relations + .into_iter() + .map(|b| ArcSwap::new(Arc::new(b))) + .collect(), sequences, backing_store, slotbox, + commit_lock: Mutex::new(()), }) } @@ -161,10 +180,10 @@ impl RelBox { pub fn with_relation R>(&self, relation_id: RelationId, f: F) -> R { f(self .canonical - .read() - .unwrap() .get(relation_id.0) - .expect("No such relation")) + .expect("No such relation") + .load() + .as_ref()) } /// Prepare a commit set for the given transaction. This will scan through the transaction's @@ -174,15 +193,16 @@ impl RelBox { &self, commit_ts: u64, tx_working_set: &mut WorkingSet, - ) -> Result { + ) -> Result<(CommitSet, MutexGuard<()>), CommitError> { let mut commitset = CommitSet::new(commit_ts); + let commit_guard = self.commit_lock.lock().unwrap(); for (_, local_relation) in tx_working_set.relations.iter_mut() { let relation_id = local_relation.id; // scan through the local working set, and for each tuple, check to see if it's safe to // commit. If it is, then we'll add it to the commit set. // note we're not actually committing yet, just producing a candidate commit set - let canonical = &self.canonical.read().unwrap()[relation_id.0]; + let canonical = &self.canonical[relation_id.0].load(); for mut tuple in local_relation.tuples_mut() { // Look for the most recent version for this domain in the canonical relation. let canon_tuple = canonical.seek_by_domain(tuple.domain().clone()); @@ -246,24 +266,28 @@ impl RelBox { } } } - Ok(commitset) + Ok((commitset, commit_guard)) } /// Actually commit a transaction's (approved) CommitSet. If the underlying base relations have - /// changed since the transaction started, this will return `Err(RelationContentionConflict)` + /// changed since the commit set began, this will return `Err(RelationContentionConflict)` /// and the transaction can choose to try to produce a new CommitSet, or just abort. - pub(crate) fn try_commit(&self, commit_set: CommitSet) -> Result<(), CommitError> { - // swap the active canonical state with the new one. but only if the timestamps have not - // changed in the interim; we have to hold a lock while this is done. If any relations have - // had their ts change, we need to retry. - // We have to hold a lock during the duration of this. If we fail, we will loop back - // and retry. - let mut canonical = self.canonical.write().unwrap(); + pub(crate) fn try_commit( + &self, + commit_set: CommitSet, + _commit_guard: MutexGuard<()>, + ) -> Result<(), CommitError> { + // swap the active canonical state with the new one + // but only if the timestamps have not changed in the interim; + // we have to hold a lock while this is checked. If any relations have + // had their ts change, we probably need to retry because someone else got there first + // This shouldn't happen if we have a commit guard. for (_, relation) in commit_set.iter() { // Did the relation get committed to by someone else in the interim? If so, return // back to the transaction letting it know that, and it can decide if it wants to // retry. - if relation.ts != canonical[relation.id.0].ts { + // This shouldn't happen if we have a commit guard. + if relation.ts != self.canonical[relation.id.0].load().ts { return Err(CommitError::RelationContentionConflict); } } @@ -271,11 +295,11 @@ impl RelBox { // Everything passed, so we can commit the changes by swapping in the new canonical // before releasing the lock. let commit_ts = commit_set.ts; - for (_, relation) in commit_set.into_iter() { + for (_, mut relation) in commit_set.into_iter() { let idx = relation.id.0; - canonical[idx] = relation; - // And update the timestamp on the canonical relation. - canonical[idx].ts = commit_ts; + + relation.ts = commit_ts; + self.canonical[idx].store(Arc::new(relation)); } Ok(()) diff --git a/crates/db/src/rdb/tx/transaction.rs b/crates/db/src/rdb/tx/transaction.rs index d3454c4c..1b974a1a 100644 --- a/crates/db/src/rdb/tx/transaction.rs +++ b/crates/db/src/rdb/tx/transaction.rs @@ -12,10 +12,11 @@ // this program. If not, see . // -use std::cell::RefCell; +use std::cell::{Cell, RefCell}; use std::collections::HashSet; -use std::sync::Arc; -use std::time::Duration; +use std::marker::PhantomData; +use std::sync::{Arc, MutexGuard}; +use std::thread::yield_now; use thiserror::Error; @@ -30,6 +31,9 @@ use crate::rdb::tx::relvar::RelVar; use crate::rdb::tx::working_set::WorkingSet; use crate::rdb::RelationId; +type PhantomUnsync = PhantomData>; +type PhantomUnsend = PhantomData>; + /// A versioned transaction, which is a fork of the current canonical base relations. pub struct Transaction { /// Where we came from, for referencing back to the base relations. @@ -38,6 +42,9 @@ pub struct Transaction { /// to the transaction, and represents the set of values that will be committed to the base /// relations at commit time. pub(crate) working_set: RefCell>, + + unsend: PhantomUnsend, + unsync: PhantomUnsync, } /// Errors which can occur during a commit. @@ -59,6 +66,8 @@ impl Transaction { Self { db, working_set: RefCell::new(Some(ws)), + unsend: Default::default(), + unsync: Default::default(), } } @@ -78,11 +87,11 @@ impl Transaction { tries += 1; let commit_ts = self.db.clone().next_ts(); let mut working_set = self.working_set.borrow_mut(); - let commit_set = self + let (commit_set, commit_guard) = self .db .prepare_commit_set(commit_ts, working_set.as_mut().unwrap())?; - match self.db.try_commit(commit_set) { - Ok(_) => { + match self.db.try_commit(commit_set, commit_guard) { + Ok(()) => { let working_set = working_set.take().unwrap(); self.db.sync(commit_ts, working_set); return Ok(()); @@ -92,8 +101,7 @@ impl Transaction { return Err(CommitError::RelationContentionConflict); } else { // Release the lock, pause a bit, and retry the commit set. - drop(working_set); - std::thread::sleep(Duration::from_millis(5)); + yield_now(); continue 'retry; } } @@ -310,7 +318,7 @@ mod tests { // Verify canonical state. { - let relation = &db.canonical.read().unwrap()[0]; + let relation = &db.canonical[0].load(); let tuple = relation .seek_by_domain(attr(b"abc")) .expect("Expected tuple to exist"); diff --git a/crates/db/tests/rdb_restore.rs b/crates/db/tests/rdb_restore.rs index 2616f950..58bdc038 100644 --- a/crates/db/tests/rdb_restore.rs +++ b/crates/db/tests/rdb_restore.rs @@ -17,7 +17,6 @@ mod test { use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; - use std::time::Duration; use tracing_test::traced_test; use moor_db::rdb::{RelBox, RelationInfo}; @@ -61,11 +60,8 @@ mod test { .insert_tuple(from_val(*value), from_val(*value)) .unwrap(); } - Value::r(_, register, _) => { - let relation = RelationId(*register as usize); - - // Full-scan. - tx.relation(relation).predicate_scan(&|_| true).unwrap(); + Value::r(_, _, _) => { + continue; } } } @@ -129,10 +125,6 @@ mod test { expected }; - // Sleep for a bit to encourage any buffers to flush, etc. - // TODO: ew. - std::thread::sleep(Duration::from_secs(1)); - // Verify the WAL directory is not empty. assert!(std::fs::read_dir(format!("{}/wal", tmpdir_str)) .unwrap() @@ -144,23 +136,24 @@ mod test { for _ in 0..5 { let db = test_db(tmpdir.path().into()); - // Verify the pages directory is not empty after recovery. + // Verify the pages directory is not empty after recovery, but that the WAL directory is let pages = std::fs::read_dir(format!("{}/pages", tmpdir_str)); let num_pages = pages.unwrap().count(); assert_ne!(num_pages, 0); - let tx = db.clone().start_tx(); // Verify all the tuples in all the relations are there for relation in tuples.keys() { let expected_tuples = tuples.get(relation).unwrap(); - let rel = tx.relation(*relation); - for t in expected_tuples { - let domain = from_val(*t); - let v = rel - .seek_by_domain(domain.clone()) - .expect("Missing expected tuple value"); - assert_eq!(domain, v.domain()); - } + + db.with_relation(*relation, |r| { + for k in expected_tuples { + let t = from_val(*k); + let v = r + .seek_by_domain(t.clone()) + .unwrap_or_else(|| panic!("Tup {:?} not found", k)); + assert_eq!(v.domain(), t); + } + }); } db.shutdown(); }