Skip to content

Commit

Permalink
Confine EventRecord usage to the recorder impl.
Browse files Browse the repository at this point in the history
Signed-off-by: Matej Pavlovic <[email protected]>
  • Loading branch information
matejpavlovic committed Nov 28, 2023
1 parent 34d0483 commit 5287f2b
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 103 deletions.
3 changes: 2 additions & 1 deletion pkg/eventlog/eventwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package eventlog
import (
"compress/gzip"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
t "github.com/filecoin-project/mir/pkg/types"
)

type EventWriter interface {
Write(record EventRecord) (EventRecord, error)
Write(evts *events.EventList, timestamp int64) (*events.EventList, error)
Flush() error
Close() error
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/eventlog/eventwritergzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/pb/recordingpb"
t "github.com/filecoin-project/mir/pkg/types"
Expand All @@ -35,21 +36,21 @@ func NewGzipWriter(filename string, compressionLevel int, nodeID t.NodeID, logge
}, nil
}

func (w *gzipWriter) Write(record EventRecord) (EventRecord, error) {
func (w *gzipWriter) Write(evts *events.EventList, timestamp int64) (*events.EventList, error) {
gzWriter, err := gzip.NewWriterLevel(w.dest, w.compressionLevel)
if err != nil {
return record, err
return nil, err
}
defer func() {
if err := gzWriter.Close(); err != nil {
w.logger.Log(logging.LevelError, "Error closing gzWriter.", "err", err)
}
}()

return record, writeRecordedEvent(gzWriter, &recordingpb.Entry{
return evts, writeRecordedEvent(gzWriter, &recordingpb.Entry{
NodeId: w.nodeID.Pb(),
Time: record.Time,
Events: record.Events.Slice(),
Time: timestamp,
Events: evts.Slice(),
})
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/eventlog/eventwritersqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "github.com/mattn/go-sqlite3" // Driver for the sql database
"google.golang.org/protobuf/encoding/protojson"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
t "github.com/filecoin-project/mir/pkg/types"
)
Expand Down Expand Up @@ -43,27 +44,27 @@ func NewSqliteWriter(filename string, nodeID t.NodeID, logger logging.Logger) (E
}, nil
}

func (w sqliteWriter) Write(record EventRecord) (EventRecord, error) {
func (w sqliteWriter) Write(evts *events.EventList, timestamp int64) (*events.EventList, error) {
// For each incoming event
iter := record.Events.Iterator()
iter := evts.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
jsonData, err := protojson.Marshal(event)
if err != nil {
return record, err
return nil, err
}

_, err = w.db.Exec(
insert,
record.Time,
timestamp,
w.nodeID,
fmt.Sprintf("%T", event.Type)[len("*eventpb.Event_"):],
jsonData,
)
if err != nil {
return record, err
return nil, err
}
}
return record, nil
return evts, nil
}

func (w sqliteWriter) Flush() error {
Expand Down
65 changes: 25 additions & 40 deletions pkg/eventlog/newfilepolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
t "github.com/filecoin-project/mir/pkg/types"
)

// Returns a file that splits an record slice into multiple slices
// every time a an event eventpb.Event_NewLogFile is found
func EventNewEpochLogger(appModuleID t.ModuleID) func(record EventRecord) []EventRecord {
// EventNewEpochLogger returns a file that splits an event list into multiple lists
// every time an event eventpb.Event_NewLogFile is found
func EventNewEpochLogger(appModuleID t.ModuleID) func(*events.EventList) []*events.EventList {
eventNewLogFileLogger := func(event *eventpb.Event) bool {
appEvent, ok := event.Type.(*eventpb.Event_App)
if !ok {
Expand All @@ -22,32 +22,25 @@ func EventNewEpochLogger(appModuleID t.ModuleID) func(record EventRecord) []Even
return EventTrackerLogger(eventNewLogFileLogger)
}

// eventTrackerLogger returns a function that tracks every single event of EventRecord and
// EventTrackerLogger returns a function that tracks every single event of EventList and
// creates a new file for every event such that newFile(event) = True
func EventTrackerLogger(newFile func(event *eventpb.Event) bool) func(time EventRecord) []EventRecord {
return func(record EventRecord) []EventRecord {
var result []EventRecord
// Create a variable to hold the current chunk
currentChunk := &EventRecord{
Time: record.Time,
Events: events.EmptyList(),
}
func EventTrackerLogger(newFile func(event *eventpb.Event) bool) func(*events.EventList) []*events.EventList {
return func(evts *events.EventList) []*events.EventList {
var result []*events.EventList
currentChunk := events.EmptyList()

for _, event := range record.Events.Slice() {
for _, event := range evts.Slice() {
if newFile(event) {
result = append(result, *currentChunk)
currentChunk = &EventRecord{
Time: record.Time,
Events: events.EmptyList().PushBack(event),
}
result = append(result, currentChunk)
currentChunk = events.ListOf(event)
} else {
currentChunk.Events.PushBack(event)
currentChunk.PushBack(event)
}
}

// If there is a remaining chunk with fewer than the desired number of events, append it to the result
if currentChunk.Events.Len() > 0 {
result = append(result, *currentChunk)
if currentChunk.Len() > 0 {
result = append(result, currentChunk)
}

return result
Expand All @@ -56,43 +49,35 @@ func EventTrackerLogger(newFile func(event *eventpb.Event) bool) func(time Event

// EventLimitLogger returns a function for the interceptor that splits the logging file
// every eventLimit number of events
func EventLimitLogger(eventLimit int64) func(EventRecord) []EventRecord {
func EventLimitLogger(eventLimit int64) func(*events.EventList) []*events.EventList {
var eventCount int64
return func(record EventRecord) []EventRecord {
// Create a slice to hold the slices of record elements
var result []EventRecord
// Create a variable to hold the current chunk
currentChunk := EventRecord{
Time: record.Time,
Events: events.EmptyList(),
}
return func(evts *events.EventList) []*events.EventList {
var result []*events.EventList
currentChunk := events.EmptyList()

// Iterate over the events in the input slice
for _, event := range record.Events.Slice() {
for _, event := range evts.Slice() {
// Add the current element to the current chunk
currentChunk.Events.PushBack(event)
currentChunk.PushBack(event)
eventCount++
// If the current chunk has the desired number of events, append it to the result and start a new chunk
if eventCount%eventLimit == 0 {
result = append(result, currentChunk)
currentChunk = EventRecord{
Time: record.Time,
Events: events.EmptyList(),
}
currentChunk = events.EmptyList()
}
}

// If there is a remaining chunk with fewer than the desired number of events, append it to the result
if currentChunk.Events.Len() > 0 {
if currentChunk.Len() > 0 {
result = append(result, currentChunk)
}

return result
}
}

func OneFileLogger() func(EventRecord) []EventRecord {
return func(record EventRecord) []EventRecord {
return []EventRecord{record}
func OneFileLogger() func(*events.EventList) []*events.EventList {
return func(evts *events.EventList) []*events.EventList {
return []*events.EventList{evts}
}
}
Loading

0 comments on commit 5287f2b

Please sign in to comment.