Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modified existing functions to return an EventList for the modified interceptor for visualization #523

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/bench/stats/stat-interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func NewStatInterceptor(s *LiveStats, txConsumer t.ModuleID) *StatInterceptor {
return &StatInterceptor{s, txConsumer}
}

func (i *StatInterceptor) Intercept(events *events.EventList) error {
func (i *StatInterceptor) Intercept(events *events.EventList) (*events.EventList, error) {

// Avoid nil dereference if Intercept is called on a nil *Recorder and simply do nothing.
// This can happen if a pointer type to *Recorder is assigned to a variable with the interface type Interceptor.
// Mir would treat that variable as non-nil, thinking there is an interceptor, and call Intercept() on it.
// For more explanation, see https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if i == nil {
return nil
return events, nil
}

it := events.Iterator()
Expand Down Expand Up @@ -62,5 +62,5 @@ func (i *StatInterceptor) Intercept(events *events.EventList) error {
}
}
}
return nil
return events, nil
}
9 changes: 7 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,21 +461,26 @@ func (n *Node) importEvents(

// If the interceptor module is present, passes events to it. Otherwise, does nothing.
// If an error occurs passing events to the interceptor, notifies the node by means of the workErrorNotifier.
// The interceptor has the ability to modify the EventList.
// The events returned by the interceptor are the events actually delivered to the system's modules
// Note: The passed Events should be free of any follow-up Events,
// as those will be intercepted separately when processed.
// Make sure to call the Strip method of the EventList before passing it to interceptEvents.
matejpavlovic marked this conversation as resolved.
Show resolved Hide resolved
func (n *Node) interceptEvents(events *events.EventList) {
func (n *Node) interceptEvents(events *events.EventList) *events.EventList {

// ATTENTION: n.interceptor is an interface type. If it is assigned the nil value of a concrete type,
// this condition will evaluate to true, and Intercept(events) will be called on nil.
// The implementation of the concrete type must make sure that calling Intercept even on the nil value
// does not cause any problems.
// For more explanation, see https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
var err error
if n.interceptor != nil {
if err := n.interceptor.Intercept(events); err != nil {
events, err = n.interceptor.Intercept(events)
if err != nil {
n.workErrNotifier.Fail(err)
}
}
return events
}

func (n *Node) pauseInput() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/eventlog/eventwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package eventlog
import (
"compress/gzip"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
t "github.com/filecoin-project/mir/pkg/types"
)

type EventWriter interface {
Write(record EventRecord) error
Write(evts *events.EventList, timestamp int64) (*events.EventList, error)
Flush() error
Close() error
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/eventlog/eventwritergzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/pb/recordingpb"
t "github.com/filecoin-project/mir/pkg/types"
Expand All @@ -35,21 +36,21 @@ func NewGzipWriter(filename string, compressionLevel int, nodeID t.NodeID, logge
}, nil
}

func (w *gzipWriter) Write(record EventRecord) error {
func (w *gzipWriter) Write(evts *events.EventList, timestamp int64) (*events.EventList, error) {
gzWriter, err := gzip.NewWriterLevel(w.dest, w.compressionLevel)
if err != nil {
return err
return nil, err
}
defer func() {
if err := gzWriter.Close(); err != nil {
w.logger.Log(logging.LevelError, "Error closing gzWriter.", "err", err)
}
}()

return writeRecordedEvent(gzWriter, &recordingpb.Entry{
return evts, writeRecordedEvent(gzWriter, &recordingpb.Entry{
NodeId: w.nodeID.Pb(),
Time: record.Time,
Events: record.Events.Slice(),
Time: timestamp,
Events: evts.Slice(),
})
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/eventlog/eventwritersqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "github.com/mattn/go-sqlite3" // Driver for the sql database
"google.golang.org/protobuf/encoding/protojson"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
t "github.com/filecoin-project/mir/pkg/types"
)
Expand Down Expand Up @@ -43,27 +44,27 @@ func NewSqliteWriter(filename string, nodeID t.NodeID, logger logging.Logger) (E
}, nil
}

func (w sqliteWriter) Write(record EventRecord) error {
func (w sqliteWriter) Write(evts *events.EventList, timestamp int64) (*events.EventList, error) {
// For each incoming event
iter := record.Events.Iterator()
iter := evts.Iterator()
for event := iter.Next(); event != nil; event = iter.Next() {
jsonData, err := protojson.Marshal(event)
if err != nil {
return err
return nil, err
}

_, err = w.db.Exec(
insert,
record.Time,
timestamp,
w.nodeID,
fmt.Sprintf("%T", event.Type)[len("*eventpb.Event_"):],
jsonData,
)
if err != nil {
return err
return nil, err
}
}
return nil
return evts, nil
}

func (w sqliteWriter) Flush() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventlog/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ type Interceptor interface {
// The implementation of the concrete type must make sure that calling Intercept even on the nil value
// does not cause any problems.
// For more explanation, see https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
Intercept(events *events.EventList) error
Intercept(events *events.EventList) (*events.EventList, error)
}
13 changes: 7 additions & 6 deletions pkg/eventlog/multiinterceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@ type repeater struct {
interceptors []Interceptor
}

func (r *repeater) Intercept(events *events.EventList) error {
func (r *repeater) Intercept(events *events.EventList) (*events.EventList, error) {

// Avoid nil dereference if Intercept is called on a nil *Recorder and simply do nothing.
// This can happen if a pointer type to *Recorder is assigned to a variable with the interface type Interceptor.
// Mir would treat that variable as non-nil, thinking there is an interceptor, and call Intercept() on it.
// For more explanation, see https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if r == nil {
return nil
return events, nil
}

var err error
for _, i := range r.interceptors {
if err := i.Intercept(events); err != nil {
return err
events, err = i.Intercept(events)
if err != nil {
return nil, err
}
}
return nil
return events, nil
}

func MultiInterceptor(interceptors ...Interceptor) Interceptor {
Expand Down
65 changes: 25 additions & 40 deletions pkg/eventlog/newfilepolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
t "github.com/filecoin-project/mir/pkg/types"
)

// Returns a file that splits an record slice into multiple slices
// every time a an event eventpb.Event_NewLogFile is found
func EventNewEpochLogger(appModuleID t.ModuleID) func(record EventRecord) []EventRecord {
// EventNewEpochLogger returns a file that splits an event list into multiple lists
// every time an event eventpb.Event_NewLogFile is found
func EventNewEpochLogger(appModuleID t.ModuleID) func(*events.EventList) []*events.EventList {
eventNewLogFileLogger := func(event *eventpb.Event) bool {
appEvent, ok := event.Type.(*eventpb.Event_App)
if !ok {
Expand All @@ -22,32 +22,25 @@ func EventNewEpochLogger(appModuleID t.ModuleID) func(record EventRecord) []Even
return EventTrackerLogger(eventNewLogFileLogger)
}

// eventTrackerLogger returns a function that tracks every single event of EventRecord and
// EventTrackerLogger returns a function that tracks every single event of EventList and
// creates a new file for every event such that newFile(event) = True
func EventTrackerLogger(newFile func(event *eventpb.Event) bool) func(time EventRecord) []EventRecord {
return func(record EventRecord) []EventRecord {
var result []EventRecord
// Create a variable to hold the current chunk
currentChunk := &EventRecord{
Time: record.Time,
Events: events.EmptyList(),
}
func EventTrackerLogger(newFile func(event *eventpb.Event) bool) func(*events.EventList) []*events.EventList {
return func(evts *events.EventList) []*events.EventList {
var result []*events.EventList
currentChunk := events.EmptyList()

for _, event := range record.Events.Slice() {
for _, event := range evts.Slice() {
if newFile(event) {
result = append(result, *currentChunk)
currentChunk = &EventRecord{
Time: record.Time,
Events: events.EmptyList().PushBack(event),
}
result = append(result, currentChunk)
currentChunk = events.ListOf(event)
} else {
currentChunk.Events.PushBack(event)
currentChunk.PushBack(event)
}
}

// If there is a remaining chunk with fewer than the desired number of events, append it to the result
if currentChunk.Events.Len() > 0 {
result = append(result, *currentChunk)
if currentChunk.Len() > 0 {
result = append(result, currentChunk)
}

return result
Expand All @@ -56,43 +49,35 @@ func EventTrackerLogger(newFile func(event *eventpb.Event) bool) func(time Event

// EventLimitLogger returns a function for the interceptor that splits the logging file
// every eventLimit number of events
func EventLimitLogger(eventLimit int64) func(EventRecord) []EventRecord {
func EventLimitLogger(eventLimit int64) func(*events.EventList) []*events.EventList {
var eventCount int64
return func(record EventRecord) []EventRecord {
// Create a slice to hold the slices of record elements
var result []EventRecord
// Create a variable to hold the current chunk
currentChunk := EventRecord{
Time: record.Time,
Events: events.EmptyList(),
}
return func(evts *events.EventList) []*events.EventList {
var result []*events.EventList
currentChunk := events.EmptyList()

// Iterate over the events in the input slice
for _, event := range record.Events.Slice() {
for _, event := range evts.Slice() {
// Add the current element to the current chunk
currentChunk.Events.PushBack(event)
currentChunk.PushBack(event)
eventCount++
// If the current chunk has the desired number of events, append it to the result and start a new chunk
if eventCount%eventLimit == 0 {
result = append(result, currentChunk)
currentChunk = EventRecord{
Time: record.Time,
Events: events.EmptyList(),
}
currentChunk = events.EmptyList()
}
}

// If there is a remaining chunk with fewer than the desired number of events, append it to the result
if currentChunk.Events.Len() > 0 {
if currentChunk.Len() > 0 {
result = append(result, currentChunk)
}

return result
}
}

func OneFileLogger() func(EventRecord) []EventRecord {
return func(record EventRecord) []EventRecord {
return []EventRecord{record}
func OneFileLogger() func(*events.EventList) []*events.EventList {
return func(evts *events.EventList) []*events.EventList {
return []*events.EventList{evts}
}
}
Loading