Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Apr 20, 2024
1 parent c29fcb5 commit fd2a6f9
Show file tree
Hide file tree
Showing 16 changed files with 1,752 additions and 1,411 deletions.
24 changes: 12 additions & 12 deletions common/event_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"sync"
)

type OnStoreSuccess func(index uint64, event *edge_ctrl_pb.DataState_Event)
type OnStoreSuccess func(index uint64, event *edge_ctrl_pb.DataState_ChangeSet)

type EventCache interface {
// Store allows storage of an event and execution of an onSuccess callback while the event cache remains locked.
// onSuccess may be nil. This function is blocking.
Store(event *edge_ctrl_pb.DataState_Event, onSuccess OnStoreSuccess) error
Store(event *edge_ctrl_pb.DataState_ChangeSet, onSuccess OnStoreSuccess) error

// CurrentIndex returns the latest event index applied. This function is blocking.
CurrentIndex() (uint64, bool)
Expand All @@ -20,7 +20,7 @@ type EventCache interface {
// An empty slice and true is returned in cases where the requested startIndex is the current index.
// An empty slice and false is returned in cases where the replay cannot be facilitated.
// This function is blocking.
ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.DataState_Event, bool)
ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.DataState_ChangeSet, bool)

// WhileLocked allows the execution of arbitrary functionality while the event cache is locked. This function
// is blocking.
Expand Down Expand Up @@ -55,7 +55,7 @@ func (cache *ForgetfulEventCache) WhileLocked(callback func(uint64, bool)) {
callback(cache.currentIndex())
}

func (cache *ForgetfulEventCache) Store(event *edge_ctrl_pb.DataState_Event, onSuccess OnStoreSuccess) error {
func (cache *ForgetfulEventCache) Store(event *edge_ctrl_pb.DataState_ChangeSet, onSuccess OnStoreSuccess) error {
cache.lock.Lock()
defer cache.lock.Unlock()

Expand All @@ -81,7 +81,7 @@ func (cache *ForgetfulEventCache) Store(event *edge_ctrl_pb.DataState_Event, onS
return nil
}

func (cache *ForgetfulEventCache) ReplayFrom(_ uint64) ([]*edge_ctrl_pb.DataState_Event, bool) {
func (cache *ForgetfulEventCache) ReplayFrom(_ uint64) ([]*edge_ctrl_pb.DataState_ChangeSet, bool) {
return nil, false
}

Expand All @@ -106,15 +106,15 @@ type LoggingEventCache struct {
HeadLogIndex uint64
LogSize uint64
Log []uint64
Events map[uint64]*edge_ctrl_pb.DataState_Event
Events map[uint64]*edge_ctrl_pb.DataState_ChangeSet
}

func NewLoggingEventCache(logSize uint64) *LoggingEventCache {
return &LoggingEventCache{
HeadLogIndex: 0,
LogSize: logSize,
Log: make([]uint64, logSize),
Events: map[uint64]*edge_ctrl_pb.DataState_Event{},
Events: map[uint64]*edge_ctrl_pb.DataState_ChangeSet{},
}
}

Expand All @@ -125,7 +125,7 @@ func (cache *LoggingEventCache) SetCurrentIndex(index uint64) {
cache.HeadLogIndex = 0
cache.Log = make([]uint64, cache.LogSize)
cache.Log[0] = index
cache.Events = map[uint64]*edge_ctrl_pb.DataState_Event{}
cache.Events = map[uint64]*edge_ctrl_pb.DataState_ChangeSet{}
}

func (cache *LoggingEventCache) WhileLocked(callback func(uint64, bool)) {
Expand All @@ -135,7 +135,7 @@ func (cache *LoggingEventCache) WhileLocked(callback func(uint64, bool)) {
callback(cache.currentIndex())
}

func (cache *LoggingEventCache) Store(event *edge_ctrl_pb.DataState_Event, onSuccess OnStoreSuccess) error {
func (cache *LoggingEventCache) Store(event *edge_ctrl_pb.DataState_ChangeSet, onSuccess OnStoreSuccess) error {
cache.lock.Lock()
defer cache.lock.Unlock()

Expand Down Expand Up @@ -188,7 +188,7 @@ func (cache *LoggingEventCache) currentIndex() (uint64, bool) {
return cache.Log[cache.HeadLogIndex], true
}

func (cache *LoggingEventCache) ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.DataState_Event, bool) {
func (cache *LoggingEventCache) ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.DataState_ChangeSet, bool) {
cache.lock.Lock()
defer cache.lock.Unlock()

Expand Down Expand Up @@ -219,15 +219,15 @@ func (cache *LoggingEventCache) ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.D

// ez replay
if *startLogIndex < cache.HeadLogIndex {
var result []*edge_ctrl_pb.DataState_Event
var result []*edge_ctrl_pb.DataState_ChangeSet
for _, key := range cache.Log[*startLogIndex:cache.HeadLogIndex] {
result = append(result, cache.Events[key])
}
return result, true
}

//looping replay
var result []*edge_ctrl_pb.DataState_Event
var result []*edge_ctrl_pb.DataState_ChangeSet
for _, key := range cache.Log[*startLogIndex:] {
result = append(result, cache.Events[key])
}
Expand Down
Loading

0 comments on commit fd2a6f9

Please sign in to comment.