Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added function related to the debugger #528

Merged
merged 19 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e750f30
modified functions to return EventList
Nov 13, 2023
dc88b2a
modified functions to return EventList
Nov 13, 2023
d88c2f1
Confine EventRecord usage to the recorder impl.
matejpavlovic Nov 28, 2023
ef1a743
Merge remote-tracking branch 'origin/visualization' into visualization
Nov 28, 2023
e40e168
Removed InterceptWithReturn and modified Intercept to now return an E…
Cath3876 Nov 28, 2023
e6fa802
changed return value to nil and feed output of one interceptor to inp…
Cath3876 Nov 30, 2023
211222d
Added functions for the debugger in debugger.go
Cath3876 Dec 7, 2023
a7e0190
changed main.go in pingpong to be able to use the debugger
Cath3876 Dec 7, 2023
0685c50
minor fixes to pass checks on pull request.
Cath3876 Dec 7, 2023
d6b7356
minor fixes to pass checks on pull request.
Cath3876 Dec 7, 2023
f3f04be
minor fixes to pass checks on pull request.
Cath3876 Dec 7, 2023
10e585d
minor fixes to pass checks on pull request.
Cath3876 Dec 7, 2023
83e0706
Merge branch 'visualization' into debugger
Cath3876 Jan 30, 2024
b0ee2f0
InterceptorInit() now always returns the interceptor,
Cath3876 Jan 30, 2024
5bdc11c
addressed failed checks
Cath3876 Jan 30, 2024
b369a27
Simplify interface-string conversions
matejpavlovic Feb 5, 2024
d193192
change InterceptorInit to NewWebSocketDebugger and update its descrip…
Cath3876 Feb 6, 2024
04b6a7c
Pass logger as parameter to the debugger creation
matejpavlovic Feb 6, 2024
12827ef
Clean up a bit and separate debugger in 2 files
matejpavlovic Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions pkg/debugger/debugger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package debugger

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

"github.com/gorilla/websocket"

"github.com/filecoin-project/mir/pkg/eventlog"
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
t "github.com/filecoin-project/mir/pkg/types"
)

const (
ReadBufferSize = 1024
WriteBufferSize = 1024
)

type WSWriter struct {
// ... websocket server variables ...
conn *websocket.Conn
upgrader websocket.Upgrader
eventSignal chan map[string]interface{}
WSMessage struct {
Type string `json:"Type"`
Value string `json:"Value"`
}
}

// InterceptorInit initializes the interceptor according to input boolean
// The interceptor is set to nil if the user decides to not use the debugger
func InterceptorInit(
ownID t.NodeID,
port string,
) (*eventlog.Recorder, error) {
// writerFactory creates and returns a WebSocket-based event writer
writerFactory := func(_ string, ownID t.NodeID, _ logging.Logger) (eventlog.EventWriter, error) {
return newWSWriter(fmt.Sprintf(":%s", port)), nil
}

var interceptor *eventlog.Recorder
var err error
interceptor, err = eventlog.NewRecorder(
ownID,
fmt.Sprintf("./node%s", ownID),
logging.ConsoleInfoLogger,
eventlog.EventWriterOpt(writerFactory),
eventlog.SyncWriteOpt(),
)
if err != nil {
panic(err)
matejpavlovic marked this conversation as resolved.
Show resolved Hide resolved
}
return interceptor, err
}

// Flush does nothing at the moment
func (wsw *WSWriter) Flush() error {
return nil
}

// Close closes the connection
func (wsw *WSWriter) Close() error {
if wsw.conn == nil {
return nil
}
return wsw.conn.Close()
}

// Write sends every event to the frontend and then wits for a message detailing how to proceed with that event
// The returned EventList contains the accepted events
func (wsw *WSWriter) Write(list *events.EventList, _ int64) (*events.EventList, error) {
for wsw.conn == nil {
fmt.Println("No connection.")
matejpavlovic marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(time.Millisecond * 100)
}
if list.Len() == 0 {
return list, nil
}

acceptedEvents := events.EmptyList()
iter := list.Iterator()

for event := iter.Next(); event != nil; event = iter.Next() {
// Create a new JSON object with a timestamp field
timestamp := time.Now()
logData := map[string]interface{}{
"event": event,
"timestamp": timestamp,
}

// Marshal the JSON data
message, err := json.Marshal(logData)
if err != nil {
panic(err)
}

// Send the JSON message over WebSocket
if err := wsw.conn.WriteMessage(websocket.TextMessage, message); err != nil {
return list, fmt.Errorf("error sending message over WebSocket: %w", err)
}

eventAction := <-wsw.eventSignal
actionType, _ := eventAction["Type"].(string)
value, _ := eventAction["Value"].(string)
acceptedEvents, _ = EventAction(actionType, value, acceptedEvents, event)
}
return acceptedEvents, nil
}

// EventAction decides, based on the input what exactly is done next with the current event
func EventAction(
actionType string,
_ string,
acceptedEvents *events.EventList,
currentEvent *eventpb.Event,
) (*events.EventList, error) {
if actionType == "accept" {
acceptedEvents.PushBack(currentEvent)
}
return acceptedEvents, nil
}

func (wsw *WSWriter) HandleClientSignal(signal map[string]interface{}) {
wsw.eventSignal <- signal
}

// newWSWriter creates a new WSWriter that establishes a websocket connection
func newWSWriter(port string) *WSWriter {

// Create a new WSWriter object
wsWriter := &WSWriter{
upgrader: websocket.Upgrader{
ReadBufferSize: ReadBufferSize,
WriteBufferSize: WriteBufferSize,
},
eventSignal: make(chan map[string]interface{}),
}

http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
wsWriter.upgrader.CheckOrigin = func(r *http.Request) bool { return true } // Allow opening the connection by HTML file
conn, err := wsWriter.upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}

wsWriter.conn = conn
defer func() {
err := wsWriter.Close()
if err != nil {
panic(err)
}
}() // Ensure the connection is closed when the function exits

for {
messageType, message, err := conn.ReadMessage()
if err != nil || messageType != websocket.TextMessage {
break
}

var signal map[string]interface{}
err = json.Unmarshal(message, &signal)
if err != nil {
panic(err)
}

signalType, typeOk := signal["Type"].(string)
signalValue, valueOk := signal["Value"].(string)
if !typeOk || !valueOk {
panic(fmt.Sprintf("Invalid signal format: Type or Value key missing or not a string in %+v", signal))
}

// Check if the signal is a 'close' command
if signalType == "close" && signalValue == "" {
break
}

wsWriter.HandleClientSignal(signal)
}
})

// Create an Async go routine that waits for the connection
go func() {
server := &http.Server{
Addr: port,
Handler: nil,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 15 * time.Second,
}

err := server.ListenAndServe()
matejpavlovic marked this conversation as resolved.
Show resolved Hide resolved
if err != nil && !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
return wsWriter
}
41 changes: 35 additions & 6 deletions samples/pingpong/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"time"

"github.com/filecoin-project/mir/pkg/debugger"

"github.com/filecoin-project/mir"
"github.com/filecoin-project/mir/pkg/eventlog"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/net/grpc"
Expand All @@ -18,17 +22,42 @@ import (

func main() {
fmt.Println("Starting ping-pong.")

// Manually create system membership with just 2 nodes.
membership := &trantorpbtypes.Membership{map[t.NodeID]*trantorpbtypes.NodeIdentity{ // nolint:govet
"0": {"0", "/ip4/127.0.0.1/tcp/10000", nil, "1"}, // nolint:govet
"1": {"1", "/ip4/127.0.0.1/tcp/10001", nil, "1"}, // nolint:govet
}}

// Get own ID from command line.
ownID := t.NodeID(os.Args[1])
debug := flag.Bool("d", false, "Enable debug mode")
debugPort := flag.String("port", "", "Debug port number")
flag.Parse()
var ownID t.NodeID
var interceptor *eventlog.Recorder
var err error

if *debug {
// In debug mode, expect the next argument to be the node ID
if flag.NArg() > 0 {
ownID = t.NodeID(flag.Arg(0))
} else {
fmt.Println("Node ID must be provided in debug mode")
os.Exit(1)
}
interceptor, err = debugger.InterceptorInit(ownID, *debugPort) // replace ownID with port number
if err != nil {
panic(err)
}
} else {
// If not in debug mode, use the first argument as the node ID
if len(os.Args) > 1 {
ownID = t.NodeID(os.Args[1])
} else {
fmt.Println("Node ID must be provided")
os.Exit(1)
}
}

// Instantiate network trnasport module and establish connections.
// Instantiate network transport module and establish connections.
transport, err := grpc.NewTransport(ownID, membership.Nodes[ownID].Addr, logging.ConsoleWarnLogger)
if err != nil {
panic(err)
Expand All @@ -48,7 +77,7 @@ func main() {
"pingpong": lowlevel.NewPingPong(ownID),
"timer": timer.New(),
},
nil,
interceptor,
)
if err != nil {
panic(err)
Expand All @@ -60,7 +89,7 @@ func main() {
nodeError <- node.Run(context.Background())
}()
fmt.Println("Mir node running.")
time.Sleep(5 * time.Second)
time.Sleep(50 * time.Second)

// Stop the node.
node.Stop()
Expand Down
Loading