diff --git a/rust/template/api.rs b/rust/template/api.rs index 3d1726afe..d418a7f93 100644 --- a/rust/template/api.rs +++ b/rust/template/api.rs @@ -29,7 +29,9 @@ pub struct HDDlog { pub replay_file: Option> } +/* Public API */ impl HDDlog { + pub fn print_err(f: Option, msg: &str) { match f { None => eprintln!("{}", msg), @@ -41,8 +43,335 @@ impl HDDlog { { Self::print_err(self.print_err, msg) } + + pub fn get_table_id(tname: &str) -> Result + { + relname2id(tname).ok_or_else(||format!("unknown relation {}", tname)) + } + + // TODO wrap the C API on this + pub fn run(workers: usize, + do_store: bool, + cb: Option) -> HDDlog + where F: Callback + { + let workers = if workers == 0 { 1 } else { workers }; + + let db: Arc> = Arc::new(Mutex::new(ValMap::new())); + let db2 = db.clone(); + + let deltadb: Arc>> = Arc::new(Mutex::new(None)); + let deltadb2 = deltadb.clone(); + + let handler: Box> = { + let handler_generator = move || { + let mut nhandlers: usize = 1; + + /* Always use delta handler, which costs nothing unless it is + * actually used*/ + let delta_handler = DeltaUpdateHandler::new(deltadb2); + + let store_handler = if do_store { + nhandlers = nhandlers + 1; + Some(ValMapUpdateHandler::new(db2)) + } else { + None + }; + let cb_handler = cb.map(|f| {nhandlers+=1; CallbackUpdateHandler::new(f)}); + + let handler: Box> = if nhandlers == 1 { + Box::new(delta_handler) + } else { + let mut handlers: Vec>> = Vec::new(); + handlers.push(Box::new(delta_handler)); + store_handler.map(|h| handlers.push(Box::new(h))); + cb_handler.map(|h| handlers.push(Box::new(h))); + Box::new(ChainedUpdateHandler::new(handlers)) + }; + handler + }; + Box::new(ThreadUpdateHandler::new(handler_generator)) + }; + + let program = prog(handler.mt_update_cb()); + + /* Notify handler about initial transaction */ + handler.before_commit(); + let prog = program.run(workers as usize); + handler.after_commit(true); + + HDDlog{ + prog: Mutex::new(prog), + update_handler: handler, + db: Some(db), + deltadb: deltadb, + print_err: None, + replay_file: None} + } + + pub fn record_commands(&mut self, file: &mut Option>) + { + mem::swap(&mut self.replay_file, file); + } + + pub fn dump_input_snapshot(&self, w: &mut W) -> io::Result<()> + { + for (rel, relname) in INPUT_RELIDMAP.iter() { + let prog = self.prog.lock().unwrap(); + match prog.get_input_relation_data(*rel as RelId) { + Ok(valset) => { + for v in valset.iter() { + writeln!(w, "insert {}[{}],", relname, v)?; + } + }, + _ => match prog.get_input_relation_index(*rel as RelId) { + Ok(ivalset) => { + for v in ivalset.values() { + writeln!(w, "insert {}[{}],", relname, v)?; + } + }, + _ => { + panic!("Unknown input relation {:?} in dump_input_snapshot", rel); + } + } + } + }; + Ok(()) + } + + pub fn stop(self) -> Result<(), String> { + self.prog.into_inner().map(|p|p.stop()).unwrap() + } + + pub fn transaction_start(&self) -> Result<(), String> + { + self.record_transaction_start(); + self.prog.lock().unwrap().transaction_start() + } + + pub fn transaction_commit_dump_changes + (&self, cb: Option) -> Result<(), String> + where F: FnMut(usize, &record::Record, bool) + { + self.record_transaction_commit(true); + *self.deltadb.lock().unwrap() = Some(DeltaMap::new()); + + self.update_handler.before_commit(); + match (self.prog.lock().unwrap().transaction_commit()) { + Ok(()) => { + self.update_handler.after_commit(true); + let mut delta = self.deltadb.lock().unwrap(); + Self::dump_delta(delta.as_mut().unwrap(), cb); + *delta = None; + Ok(()) + }, + Err(e) => { + self.update_handler.after_commit(false); + Err(e) + } + } + } + + pub fn transaction_commit(&self) -> Result<(), String> { + self.record_transaction_commit(false); + self.update_handler.before_commit(); + + match (self.prog.lock().unwrap().transaction_commit()) { + Ok(()) => { + self.update_handler.after_commit(true); + Ok(()) + }, + Err(e) => { + self.update_handler.after_commit(false); + Err(e) + } + } + } + + pub fn transaction_rollback(&self) -> Result<(), String> { + let _ = self.record_transaction_rollback(); + self.prog.lock().unwrap().transaction_rollback() + } + + pub fn apply_updates<'a, I: iter::ExactSizeIterator + Clone>(&self, upds: I) -> Result<(), String>{ + self.record_updates(upds.clone()); + + let upds_vec: Result, _> = upds.map(|upd| updcmd2upd(upd)) + .collect(); + let upds = upds_vec?; + self.prog.lock().unwrap().apply_updates(upds) + } + + pub fn clear_relation(&self, table: usize) -> Result<(), String> { + self.record_clear_relation(table); + self.prog.lock().unwrap().clear_relation(table) + } + + pub fn dump_table(&self, table: usize, cb: Option) -> Result<(), &str> + where F: Fn(&record::Record) -> bool + { + self.record_dump_table(table); + if let Some(ref db) = self.db { + HDDlog::db_dump_table(&mut db.lock().unwrap(), table, cb); + Ok(()) + } else { + Err("cannot dump table: ddlog_run() was invoked with do_store flag set to false") + } + } + + /* + * Controls recording of differential operator runtimes. When enabled, + * DDlog records each activation of every operator and prints the + * per-operator CPU usage summary in the profile. When disabled, the + * recording stops, but the previously accumulated profile is preserved. + * + * Recording CPU events can be expensive in large dataflows and is + * therefore disabled by default. + */ + pub fn enable_cpu_profiling(&self, enable: bool) { + self.record_enable_cpu_profiling(enable); + self.prog.lock().unwrap().enable_cpu_profiling(enable); + } + + /* + * returns DDlog program runtime profile + */ + pub fn profile(&self) -> String { + self.record_profile(); + let rprog = self.prog.lock().unwrap(); + let profile: String = rprog.profile.lock().unwrap().to_string(); + profile + } } +/* Internals */ +impl HDDlog { + fn dump_delta(db: &mut DeltaMap, + cb: Option) + where F:FnMut(usize, &record::Record, bool) + { + cb.map(|mut f| + for (table_id, table_data) in db.as_ref().iter() { + for (val, weight) in table_data.iter() { + debug_assert!(*weight == 1 || *weight == -1); + f(*table_id as libc::size_t, + &val.clone().into_record(), + *weight == 1); + } + }); + } + + fn db_dump_table(db: &mut ValMap, + table: libc::size_t, + cb: Option) + where F:Fn(&record::Record) -> bool + { + cb.map(|f| + for val in db.get_rel(table) { + if !f(&val.clone().into_record()) { + break; + } + }); + } + + fn record_transaction_start(&self) { + if let Some(ref f) = self.replay_file { + if writeln!(f.lock().unwrap(), "start;").is_err() { + self.eprintln("failed to record invocation in replay file"); + } + } + } + + fn record_transaction_commit(&self, record_changes: bool) { + if let Some(ref f) = self.replay_file { + let res = if record_changes { + writeln!(f.lock().unwrap(), "commit dump_changes;") + } else { + writeln!(f.lock().unwrap(), "commit;") + }; + if res.is_err() { + self.eprintln("failed to record invocation in replay file"); + } + } + } + + fn record_transaction_rollback(&self) -> Result<(), String> { + if let Some(ref f) = self.replay_file { + if writeln!(f.lock().unwrap(), "rollback;").is_err() { + Err("failed to record invocation in replay file".to_string()) + } else { + Ok(()) + } + } else { + Ok(()) + } + } + + fn record_updates<'a, I: iter::ExactSizeIterator>(&self, upds: I) { + let n = upds.len(); + + if let Some(ref f) = self.replay_file { + let mut file = f.lock().unwrap(); + for (i, u) in upds.enumerate() { + let sep = if i == n - 1 { ";" } else { "," }; + record_update(&mut *file, u); + let _ = writeln!(file, "{}", sep); + } + } + } + + fn record_clear_relation(&self, table:usize) { + if let Some(ref f) = self.replay_file { + if writeln!(f.lock().unwrap(), "clear {};", relid2name(table).unwrap_or(&"???")).is_err() { + self.eprintln("failed to record invocation in replay file"); + } + } + } + + fn record_dump_table(&self, table: usize) { + if let Some(ref f) = self.replay_file { + if writeln!(f.lock().unwrap(), "dump {};", relid2name(table).unwrap_or(&"???")).is_err() { + self.eprintln("ddlog_dump_table(): failed to record invocation in replay file"); + } + } + } + + fn record_enable_cpu_profiling(&self, enable: bool) { + if let Some(ref f) = self.replay_file { + if writeln!(f.lock().unwrap(), "profile cpu {};", if enable { "on" } else { "off" }).is_err() { + self.eprintln("ddlog_cpu_profiling_enable(): failed to record invocation in replay file"); + } + } + } + + fn record_profile(&self) { + if let Some(ref f) = self.replay_file { + if writeln!(f.lock().unwrap(), "profile;").is_err() { + self.eprintln("failed to record invocation in replay file"); + } + } + } +} + +pub fn record_update(file: &mut fs::File, upd: &record::UpdCmd) +{ + match upd { + record::UpdCmd::Insert(rel, record) => { + let _ = write!(file, "insert {}[{}]", relident2name(rel).unwrap_or(&"???"), record); + }, + record::UpdCmd::Delete(rel, record) => { + let _ = write!(file, "delete {}[{}]", relident2name(rel).unwrap_or(&"???"), record); + }, + record::UpdCmd::DeleteKey(rel, record) => { + let _ = write!(file, "delete_key {} {}", relident2name(rel).unwrap_or(&"???"), record); + }, + record::UpdCmd::Modify(rel, key, mutator) => { + let _ = write!(file, "modify {} {} <- {}", relident2name(rel).unwrap_or(&"???"), key, mutator); + } + } +} + + pub fn updcmd2upd(c: &record::UpdCmd) -> Result, String> { match c { @@ -83,7 +412,9 @@ fn relident2name(r: &record::RelIdentifier) -> Option<&str> { } } -fn __null_cb(_relid: RelId, _v: &Value, _w: isize) {} +/*************************************************** + * C bindings + ***************************************************/ #[no_mangle] pub unsafe extern "C" fn ddlog_get_table_id(tname: *const raw::c_char) -> libc::size_t @@ -100,13 +431,6 @@ pub unsafe extern "C" fn ddlog_get_table_id(tname: *const raw::c_char) -> libc:: } } -impl HDDlog { - pub fn get_table_id(tname: &str) -> Result - { - relname2id(tname).ok_or_else(||format!("unknown relation {}", tname)) - } -} - #[no_mangle] pub extern "C" fn ddlog_run( workers: raw::c_uint, @@ -172,75 +496,6 @@ pub extern "C" fn ddlog_run( replay_file: None})) } -// TODO wrap the C API on this -impl HDDlog { - pub fn run(workers: usize, - do_store: bool, - cb: Option) -> HDDlog - where F: Callback - { - let workers = if workers == 0 { 1 } else { workers }; - - let db: Arc> = Arc::new(Mutex::new(ValMap::new())); - let db2 = db.clone(); - - let deltadb: Arc>> = Arc::new(Mutex::new(None)); - let deltadb2 = deltadb.clone(); - - let handler: Box> = { - let handler_generator = move || { - let mut nhandlers: usize = 1; - - /* Always use delta handler, which costs nothing unless it is - * actually used*/ - let delta_handler = DeltaUpdateHandler::new(deltadb2); - - let store_handler = if do_store { - nhandlers = nhandlers + 1; - Some(ValMapUpdateHandler::new(db2)) - } else { - None - }; - let cb_handler = cb.map(|f| {nhandlers+=1; CallbackUpdateHandler::new(f)}); - - let handler: Box> = if nhandlers == 1 { - Box::new(delta_handler) - } else { - let mut handlers: Vec>> = Vec::new(); - handlers.push(Box::new(delta_handler)); - store_handler.map(|h| handlers.push(Box::new(h))); - cb_handler.map(|h| handlers.push(Box::new(h))); - Box::new(ChainedUpdateHandler::new(handlers)) - }; - handler - }; - Box::new(ThreadUpdateHandler::new(handler_generator)) - }; - - let program = prog(handler.mt_update_cb()); - - /* Notify handler about initial transaction */ - handler.before_commit(); - let prog = program.run(workers as usize); - handler.after_commit(true); - - HDDlog{ - prog: Mutex::new(prog), - update_handler: handler, - db: Some(db), - deltadb: deltadb, - print_err: None, - replay_file: None} - } -} - -impl HDDlog { - pub fn record_commands(&mut self, file: &mut Option>) - { - mem::swap(&mut self.replay_file, file); - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_record_commands(prog: *const HDDlog, fd: unix::io::RawFd) -> raw::c_int { @@ -279,43 +534,16 @@ pub unsafe extern "C" fn ddlog_dump_input_snapshot(prog: *const HDDlog, fd: unix return -1; }; let prog = Arc::from_raw(prog); - let mut file = fs::File::from_raw_fd(fd); - let res = prog.dump_input_snapshot(&mut file) - .map(|_|0) - .unwrap_or_else(|e| { - prog.eprintln(&format!("ddlog_dump_input_snapshot: error: {}", e)); - -1 - }); - file.into_raw_fd(); - Arc::into_raw(prog); - res -} - -impl HDDlog { - pub fn dump_input_snapshot(&self, w: &mut W) -> io::Result<()> - { - for (rel, relname) in INPUT_RELIDMAP.iter() { - let prog = self.prog.lock().unwrap(); - match prog.get_input_relation_data(*rel as RelId) { - Ok(valset) => { - for v in valset.iter() { - writeln!(w, "insert {}[{}],", relname, v)?; - } - }, - _ => match prog.get_input_relation_index(*rel as RelId) { - Ok(ivalset) => { - for v in ivalset.values() { - writeln!(w, "insert {}[{}],", relname, v)?; - } - }, - _ => { - panic!("Unknown input relation {:?} in dump_input_snapshot", rel); - } - } - } - }; - Ok(()) - } + let mut file = fs::File::from_raw_fd(fd); + let res = prog.dump_input_snapshot(&mut file) + .map(|_|0) + .unwrap_or_else(|e| { + prog.eprintln(&format!("ddlog_dump_input_snapshot: error: {}", e)); + -1 + }); + file.into_raw_fd(); + Arc::into_raw(prog); + res } #[no_mangle] @@ -347,12 +575,6 @@ pub unsafe extern "C" fn ddlog_stop(prog: *const HDDlog) -> raw::c_int } } -impl HDDlog { - pub fn stop(self) -> Result<(), String> { - self.prog.into_inner().map(|p|p.stop()).unwrap() - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_transaction_start(prog: *const HDDlog) -> raw::c_int { @@ -370,22 +592,6 @@ pub unsafe extern "C" fn ddlog_transaction_start(prog: *const HDDlog) -> raw::c_ res } -impl HDDlog { - pub fn transaction_start(&self) -> Result<(), String> - { - self.record_transaction_start(); - self.prog.lock().unwrap().transaction_start() - } - - fn record_transaction_start(&self) { - if let Some(ref f) = self.replay_file { - if writeln!(f.lock().unwrap(), "start;").is_err() { - self.eprintln("failed to record invocation in replay file"); - } - } - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_transaction_commit_dump_changes( prog: *const HDDlog, @@ -414,46 +620,6 @@ pub unsafe extern "C" fn ddlog_transaction_commit_dump_changes( res } -impl HDDlog { - pub fn transaction_commit_dump_changes - (&self, cb: Option) -> Result<(), String> - where F: FnMut(usize, &record::Record, bool) - { - self.record_transaction_commit(true); - *self.deltadb.lock().unwrap() = Some(DeltaMap::new()); - - self.update_handler.before_commit(); - match (self.prog.lock().unwrap().transaction_commit()) { - Ok(()) => { - self.update_handler.after_commit(true); - let mut delta = self.deltadb.lock().unwrap(); - HDDlog::dump_delta(delta.as_mut().unwrap(), cb); - *delta = None; - Ok(()) - }, - Err(e) => { - self.update_handler.after_commit(false); - Err(e) - } - } - } - - fn dump_delta(db: &mut DeltaMap, - cb: Option) - where F:FnMut(usize, &record::Record, bool) - { - cb.map(|mut f| - for (table_id, table_data) in db.as_ref().iter() { - for (val, weight) in table_data.iter() { - debug_assert!(*weight == 1 || *weight == -1); - f(*table_id as libc::size_t, - &val.clone().into_record(), - *weight == 1); - } - }); - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_transaction_commit(prog: *const HDDlog) -> raw::c_int { @@ -471,37 +637,6 @@ pub unsafe extern "C" fn ddlog_transaction_commit(prog: *const HDDlog) -> raw::c res } -impl HDDlog { - pub fn transaction_commit(&self) -> Result<(), String> { - self.record_transaction_commit(false); - self.update_handler.before_commit(); - - match (self.prog.lock().unwrap().transaction_commit()) { - Ok(()) => { - self.update_handler.after_commit(true); - Ok(()) - }, - Err(e) => { - self.update_handler.after_commit(false); - Err(e) - } - } - } - - fn record_transaction_commit(&self, record_changes: bool) { - if let Some(ref f) = self.replay_file { - let res = if record_changes { - writeln!(f.lock().unwrap(), "commit dump_changes;") - } else { - writeln!(f.lock().unwrap(), "commit;") - }; - if res.is_err() { - self.eprintln("failed to record invocation in replay file"); - } - } - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_transaction_rollback(prog: *const HDDlog) -> raw::c_int { @@ -518,25 +653,6 @@ pub unsafe extern "C" fn ddlog_transaction_rollback(prog: *const HDDlog) -> raw: res } -impl HDDlog { - pub fn transaction_rollback(&self) -> Result<(), String> { - let _ = self.record_transaction_rollback(); - self.prog.lock().unwrap().transaction_rollback() - } - - fn record_transaction_rollback(&self) -> Result<(), String> { - if let Some(ref f) = self.replay_file { - if writeln!(f.lock().unwrap(), "rollback;").is_err() { - Err("failed to record invocation in replay file".to_string()) - } else { - Ok(()) - } - } else { - Ok(()) - } - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_apply_updates(prog: *const HDDlog, upds: *const *mut record::UpdCmd, n: size_t) -> raw::c_int { @@ -560,48 +676,6 @@ pub unsafe extern "C" fn ddlog_apply_updates(prog: *const HDDlog, upds: *const * res } -impl HDDlog { - pub fn apply_updates<'a, I: iter::ExactSizeIterator + Clone>(&self, upds: I) -> Result<(), String>{ - self.record_updates(upds.clone()); - - let upds_vec: Result, _> = upds.map(|upd| updcmd2upd(upd)) - .collect(); - let upds = upds_vec?; - self.prog.lock().unwrap().apply_updates(upds) - } - - pub fn record_updates<'a, I: iter::ExactSizeIterator>(&self, upds: I) { - let n = upds.len(); - - if let Some(ref f) = self.replay_file { - let mut file = f.lock().unwrap(); - for (i, u) in upds.enumerate() { - let sep = if i == n - 1 { ";" } else { "," }; - record_update(&mut *file, u); - let _ = writeln!(file, "{}", sep); - } - } - } -} - -pub fn record_update(file: &mut fs::File, upd: &record::UpdCmd) -{ - match upd { - record::UpdCmd::Insert(rel, record) => { - let _ = write!(file, "insert {}[{}]", relident2name(rel).unwrap_or(&"???"), record); - }, - record::UpdCmd::Delete(rel, record) => { - let _ = write!(file, "delete {}[{}]", relident2name(rel).unwrap_or(&"???"), record); - }, - record::UpdCmd::DeleteKey(rel, record) => { - let _ = write!(file, "delete_key {} {}", relident2name(rel).unwrap_or(&"???"), record); - }, - record::UpdCmd::Modify(rel, key, mutator) => { - let _ = write!(file, "modify {} {} <- {}", relident2name(rel).unwrap_or(&"???"), key, mutator); - } - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_clear_relation( prog: *const HDDlog, @@ -620,21 +694,6 @@ pub unsafe extern "C" fn ddlog_clear_relation( res } -impl HDDlog { - pub fn clear_relation(&self, table: usize) -> Result<(), String> { - self.record_clear_relation(table); - self.prog.lock().unwrap().clear_relation(table) - } - - fn record_clear_relation(&self, table:usize) { - if let Some(ref f) = self.replay_file { - if writeln!(f.lock().unwrap(), "clear {};", relid2name(table).unwrap_or(&"???")).is_err() { - self.eprintln("failed to record invocation in replay file"); - } - } - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_dump_table( prog: *const HDDlog, @@ -658,41 +717,6 @@ pub unsafe extern "C" fn ddlog_dump_table( res } -impl HDDlog { - pub fn dump_table(&self, table: usize, cb: Option) -> Result<(), &str> - where F: Fn(&record::Record) -> bool - { - self.record_dump_table(table); - if let Some(ref db) = self.db { - HDDlog::db_dump_table(&mut db.lock().unwrap(), table, cb); - Ok(()) - } else { - Err("cannot dump table: ddlog_run() was invoked with do_store flag set to false") - } - } - - fn record_dump_table(&self, table: usize) { - if let Some(ref f) = self.replay_file { - if writeln!(f.lock().unwrap(), "dump {};", relid2name(table).unwrap_or(&"???")).is_err() { - self.eprintln("ddlog_dump_table(): failed to record invocation in replay file"); - } - } - } - - fn db_dump_table(db: &mut ValMap, - table: libc::size_t, - cb: Option) - where F:Fn(&record::Record) -> bool - { - cb.map(|f| - for val in db.get_rel(table) { - if !f(&val.clone().into_record()) { - break; - } - }); - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_enable_cpu_profiling(prog: *const HDDlog, enable: bool) -> raw::c_int { @@ -707,30 +731,6 @@ pub unsafe extern "C" fn ddlog_enable_cpu_profiling(prog: *const HDDlog, enable: 0 } -impl HDDlog { - /* - * Controls recording of differential operator runtimes. When enabled, - * DDlog records each activation of every operator and prints the - * per-operator CPU usage summary in the profile. When disabled, the - * recording stops, but the previously accumulated profile is preserved. - * - * Recording CPU events can be expensive in large dataflows and is - * therefore disabled by default. - */ - pub fn enable_cpu_profiling(&self, enable: bool) { - self.record_enable_cpu_profiling(enable); - self.prog.lock().unwrap().enable_cpu_profiling(enable); - } - - fn record_enable_cpu_profiling(&self, enable: bool) { - if let Some(ref f) = self.replay_file { - if writeln!(f.lock().unwrap(), "profile cpu {};", if enable { "on" } else { "off" }).is_err() { - self.eprintln("ddlog_cpu_profiling_enable(): failed to record invocation in replay file"); - } - } - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_profile(prog: *const HDDlog) -> *const raw::c_char { @@ -747,26 +747,6 @@ pub unsafe extern "C" fn ddlog_profile(prog: *const HDDlog) -> *const raw::c_cha res } -impl HDDlog { - /* - * returns DDlog program runtime profile - */ - pub fn profile(&self) -> String { - self.record_profile(); - let rprog = self.prog.lock().unwrap(); - let profile: String = rprog.profile.lock().unwrap().to_string(); - profile - } - - fn record_profile(&self) { - if let Some(ref f) = self.replay_file { - if writeln!(f.lock().unwrap(), "profile;").is_err() { - self.eprintln("failed to record invocation in replay file"); - } - } - } -} - #[no_mangle] pub unsafe extern "C" fn ddlog_string_free(s: *mut raw::c_char) {