-
Notifications
You must be signed in to change notification settings - Fork 0
/
reversibleio.c
116 lines (106 loc) · 3.4 KB
/
reversibleio.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/** \file reversibleio.c
* This file will contain the API that will use the iobuffers and the wrappers to make I/O operations reversible */
#include "iobuffer.h"
#include "wrappers.h"
#include "io_heap.h"
#include "core.h"
#include "list.h"
#include "dymelor.h"
#include "events.h"
#include "reversibleio.h"
///We have one heap for the unseekable
io_heap *io_h;
double* per_lp_horizon;
void reversibleio_init(){
unsigned i;
//we create the heap
per_lp_horizon=rsalloc(sizeof(double)*n_prc_tot);
memset(per_lp_horizon,-1,sizeof(double)*n_prc_tot);
io_h=io_heap_new(MIN_HEAP,n_prc_tot);
//we initialize the windows in each LP.
for(i=0;i<n_prc_tot;i++){
nblist_init(&LPS[i]->io_forward_window);
//nblist_init(LPS[i]->io_reverse_window);
//during the initialization we populate the heap with the forward windows
io_heap_add(io_h,&LPS[i]->io_forward_window);
}
}
void reversibleio_collect(int lp,double event_horizon, msg_t* to_msg){
//we save the new event horizon for the current lp
per_lp_horizon[lp]=event_horizon;
//we start reading the event list to get the events inside the global window.
msg_t *msg=to_msg;
while(list_prev(msg)!=NULL){
msg=list_prev(msg);
}
while(msg!=NULL && msg->timestamp<event_horizon && to_msg!=msg){
///for the forward window we extract the I/O operations from the heap according to their timestamp and execute them
nblist_merge(&LPS[lp]->io_forward_window,&msg->io_forward_window);
nblist_destroy(&msg->io_forward_window,destroy_iobuffer);
///for the reverse window we destroy the nblist since we do not need to roll the I/O operations back
//nblist_destroy(LPS[lp]->io_reverse_window,destroy_iobuffer);
msg=list_next(msg);
}
}
void reversibleio_rollback(msg_t *msg){
if(msg==NULL){
return;
}
////For stream files we simply discard the forward window
if(msg->io_forward_window.head!=NULL){
nblist_destroy(&msg->io_forward_window,destroy_iobuffer);
}
///For seekable files we need to restore them using the backups in the reverse window
}
void reversibleio_execute(){
//we need to compute the minimum event horizon
unsigned int i;
double event_horizon;
event_horizon=-1;
for(i=0;i<n_prc_tot;i++){
if(per_lp_horizon[i]<event_horizon || event_horizon<0){
event_horizon=per_lp_horizon[i];
}
}
double timestamp=0;
iobuffer* buf=NULL;
while(timestamp<event_horizon){
buf=(iobuffer*)io_heap_poll(io_h);
if(buf!= NULL){
timestamp=buf->timestamp;
iobuffer_write(buf);
} else {
break;
}
}
}
///To flush all the queues we insert a dummy node in each of the and we retry extracting
void reversibleio_flush(){
unsigned int i;
double max_timestamp=0,tmp;
for(i=0;i<n_prc_tot;i++){
//we need to insert a dummy an element that is at least later than the current tail
tmp=LPS[i]->io_forward_window.tail->key;
if(tmp>max_timestamp){
max_timestamp=tmp+1;
}
nblist_add(&LPS[i]->io_forward_window,NULL,max_timestamp,NBLIST_DUMMY);
}
//then we execute these events, so if the horizon is correct we will get the I/O operation executed
reversibleio_execute();
}
void reversibleio_clean(){
unsigned int i;
for(i=0;i<n_prc_tot;i++){
nblist_clean(&LPS[i]->io_forward_window,destroy_iobuffer);
// TODO: can we keep these buffers?
//nblist_clean(LPS[i]->io_reverse_window,destroy_iobuffer);
}
}
void reversibleio_destroy(){
unsigned int i=0;
for(i=0;i<n_prc_tot;i++){
nblist_destroy(&LPS[i]->io_forward_window,destroy_iobuffer);
}
io_heap_delete(io_h);
}