Skip to content

Commit

Permalink
Unify the implementation of run across C and Rust
Browse files Browse the repository at this point in the history
C and Rust API now use a shared implementation of the `run` method
parameterized by `UpdateHandler` instance.
  • Loading branch information
ryzhyk committed Jul 2, 2019
1 parent 4a07f11 commit c66f104
Showing 1 changed file with 69 additions and 104 deletions.
173 changes: 69 additions & 104 deletions rust/template/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,58 +55,10 @@ impl HDDlog {
cb: Option<F>) -> HDDlog
where F: Callback
{
let workers = if workers == 0 { 1 } else { workers };

let db: Arc<Mutex<ValMap>> = Arc::new(Mutex::new(ValMap::new()));
let db2 = db.clone();

let deltadb: Arc<Mutex<Option<DeltaMap>>> = Arc::new(Mutex::new(None));
let deltadb2 = deltadb.clone();

let handler: Box<dyn IMTUpdateHandler<Value>> = {
let handler_generator = move || {
let mut nhandlers: usize = 1;

/* Always use delta handler, which costs nothing unless it is
* actually used*/
let delta_handler = DeltaUpdateHandler::new(deltadb2);

let store_handler = if do_store {
nhandlers = nhandlers + 1;
Some(ValMapUpdateHandler::new(db2))
} else {
None
};
let cb_handler = cb.map(|f| {nhandlers+=1; CallbackUpdateHandler::new(f)});

let handler: Box<dyn UpdateHandler<Value>> = if nhandlers == 1 {
Box::new(delta_handler)
} else {
let mut handlers: Vec<Box<dyn UpdateHandler<Value>>> = Vec::new();
handlers.push(Box::new(delta_handler));
store_handler.map(|h| handlers.push(Box::new(h)));
cb_handler.map(|h| handlers.push(Box::new(h)));
Box::new(ChainedUpdateHandler::new(handlers))
};
handler
};
Box::new(ThreadUpdateHandler::new(handler_generator))
};

let program = prog(handler.mt_update_cb());

/* Notify handler about initial transaction */
handler.before_commit();
let prog = program.run(workers as usize);
handler.after_commit(true);

HDDlog{
prog: Mutex::new(prog),
update_handler: handler,
db: Some(db),
deltadb: deltadb,
print_err: None,
replay_file: None}
Self::do_run(workers,
do_store,
cb.map(|f| CallbackUpdateHandler::new(f)),
None)
}

pub fn record_commands(&mut self, file: &mut Option<Mutex<fs::File>>)
Expand Down Expand Up @@ -246,6 +198,66 @@ impl HDDlog {

/* Internals */
impl HDDlog {
fn do_run<UH>(workers: usize,
do_store: bool,
cb: Option<UH>,
print_err: Option<extern "C" fn(msg: *const raw::c_char)>) -> HDDlog
where UH: UpdateHandler<Value> + Send + 'static
{
let workers = if workers == 0 { 1 } else { workers };

let db: Arc<Mutex<ValMap>> = Arc::new(Mutex::new(ValMap::new()));
let db2 = db.clone();

let deltadb: Arc<Mutex<Option<DeltaMap>>> = Arc::new(Mutex::new(None));
let deltadb2 = deltadb.clone();

let handler: Box<dyn IMTUpdateHandler<Value>> = {
let handler_generator = move || {
let mut nhandlers: usize = 1;

/* Always use delta handler, which costs nothing unless it is
* actually used*/
let delta_handler = DeltaUpdateHandler::new(deltadb2);

let store_handler = if do_store {
nhandlers = nhandlers + 1;
Some(ValMapUpdateHandler::new(db2))
} else {
None
};
let cb_handler = cb.map(|h| { nhandlers += 1; Box::new(h) as Box<dyn UpdateHandler<Value> + Send> });

let handler: Box<dyn UpdateHandler<Value>> = if nhandlers == 1 {
Box::new(delta_handler)
} else {
let mut handlers: Vec<Box<dyn UpdateHandler<Value>>> = Vec::new();
handlers.push(Box::new(delta_handler));
store_handler.map(|h| handlers.push(Box::new(h)));
cb_handler.map(|h| handlers.push(h));
Box::new(ChainedUpdateHandler::new(handlers))
};
handler
};
Box::new(ThreadUpdateHandler::new(handler_generator))
};

let program = prog(handler.mt_update_cb());

/* Notify handler about initial transaction */
handler.before_commit();
let prog = program.run(workers as usize);
handler.after_commit(true);

HDDlog{
prog: Mutex::new(prog),
update_handler: handler,
db: Some(db),
deltadb: deltadb,
print_err: print_err,
replay_file: None}
}

fn dump_delta<F>(db: &mut DeltaMap,
cb: Option<F>)
where F:FnMut(usize, &record::Record, bool)
Expand Down Expand Up @@ -442,58 +454,11 @@ pub extern "C" fn ddlog_run(
cb_arg: libc::uintptr_t,
print_err: Option<extern "C" fn(msg: *const raw::c_char)>) -> *const HDDlog
{
let workers = if workers == 0 { 1 } else { workers };

let db: Arc<Mutex<ValMap>> = Arc::new(Mutex::new(ValMap::new()));
let db2 = db.clone();

let deltadb: Arc<Mutex<Option<DeltaMap>>> = Arc::new(Mutex::new(None));
let deltadb2 = deltadb.clone();

let handler: Box<dyn IMTUpdateHandler<Value>> = {
let handler_generator = move || {
let mut nhandlers: usize = 1;

/* Always use delta handler, which costs nothing unless it is
* actually used*/
let delta_handler = DeltaUpdateHandler::new(deltadb2);

let store_handler = if do_store {
nhandlers = nhandlers + 1;
Some(ValMapUpdateHandler::new(db2))
} else {
None
};
let cb_handler = cb.map(|f| {nhandlers+=1; ExternCUpdateHandler::new(f, cb_arg)});

let handler: Box<dyn UpdateHandler<Value>> = if nhandlers == 1 {
Box::new(delta_handler)
} else {
let mut handlers: Vec<Box<dyn UpdateHandler<Value>>> = Vec::new();
handlers.push(Box::new(delta_handler));
store_handler.map(|h| handlers.push(Box::new(h)));
cb_handler.map(|h| handlers.push(Box::new(h)));
Box::new(ChainedUpdateHandler::new(handlers))
};
handler
};
Box::new(ThreadUpdateHandler::new(handler_generator))
};

let program = prog(handler.mt_update_cb());

/* Notify handler about initial transaction */
handler.before_commit();
let prog = program.run(workers as usize);
handler.after_commit(true);

Arc::into_raw(Arc::new(HDDlog{
prog: Mutex::new(prog),
update_handler: handler,
db: Some(db),
deltadb: deltadb,
print_err: print_err,
replay_file: None}))
Arc::into_raw(Arc::new(
HDDlog::do_run(workers as usize,
do_store,
cb.map(|f| ExternCUpdateHandler::new(f, cb_arg)),
print_err)))
}

#[no_mangle]
Expand Down

0 comments on commit c66f104

Please sign in to comment.