Skip to content

Commit

Permalink
InterceptorInit() now always returns the interceptor,
Browse files Browse the repository at this point in the history
cleaned up newWSWriter(),
add optional -d flag for debug mode and refactor argument parsing
  • Loading branch information
Cath3876 committed Jan 30, 2024
1 parent 83e0706 commit b0ee2f0
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 80 deletions.
133 changes: 62 additions & 71 deletions pkg/debugger/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package debugger

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"time"

"github.com/gorilla/websocket"
Expand All @@ -19,48 +19,47 @@ import (
const (
ReadBufferSize = 1024
WriteBufferSize = 1024
BasePort = 8080
)

type WSWriter struct {
// ... websocket server variables ...
conn *websocket.Conn
upgrader websocket.Upgrader
eventSignal chan map[string]interface{}
WSMessage struct {
Type string `json:"Type"`
Value string `json:"Value"`
}
}

// InterceptorInit initializes the interceptor according to input boolean
// The interceptor is set to nil if the user decides to not use the debugger
func InterceptorInit(
debugger bool,
ownID t.NodeID,
port string,
) (*eventlog.Recorder, error) {
// writerFactory creates and returns a WebSocket-based event writer
writerFactory := func(_ string, ownID t.NodeID, _ logging.Logger) (eventlog.EventWriter, error) {
return newWSWriter(fmt.Sprintf(":%s", port)), nil
}

var interceptor *eventlog.Recorder
var err error
if debugger {
interceptor, err = eventlog.NewRecorder(
ownID,
fmt.Sprintf("./node%s", ownID),
logging.ConsoleInfoLogger,
eventlog.EventWriterOpt(writerFactory),
eventlog.SyncWriteOpt(),
)
if err != nil {
panic(err)
}
fmt.Println("Interceptor created successfully")
} else {
interceptor = nil
err = nil
interceptor, err = eventlog.NewRecorder(
ownID,
fmt.Sprintf("./node%s", ownID),
logging.ConsoleInfoLogger,
eventlog.EventWriterOpt(writerFactory),
eventlog.SyncWriteOpt(),
)
if err != nil {
panic(err)
}
return interceptor, err
}

// Flush does nothing at the moment
func (wsw *WSWriter) Flush() error {
if wsw.conn == nil {
return nil
}
return nil
}

Expand All @@ -80,7 +79,6 @@ func (wsw *WSWriter) Write(list *events.EventList, _ int64) (*events.EventList,
time.Sleep(time.Millisecond * 100)
}
if list.Len() == 0 {
fmt.Println("No events to print.")
return list, nil
}

Expand All @@ -107,8 +105,8 @@ func (wsw *WSWriter) Write(list *events.EventList, _ int64) (*events.EventList,
}

eventAction := <-wsw.eventSignal
actionType, _ := eventAction["type"].(string)
value, _ := eventAction["value"].(string)
actionType, _ := eventAction["Type"].(string)
value, _ := eventAction["Value"].(string)
acceptedEvents, _ = EventAction(actionType, value, acceptedEvents, event)
}
return acceptedEvents, nil
Expand All @@ -133,7 +131,6 @@ func (wsw *WSWriter) HandleClientSignal(signal map[string]interface{}) {

// newWSWriter creates a new WSWriter that establishes a websocket connection
func newWSWriter(port string) *WSWriter {
fmt.Println("Starting newWSWriter")

// Create a new WSWriter object
wsWriter := &WSWriter{
Expand All @@ -144,44 +141,50 @@ func newWSWriter(port string) *WSWriter {
eventSignal: make(chan map[string]interface{}),
}

// Create an Async go routine that waits for the connection
go func() {
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
wsWriter.upgrader.CheckOrigin = func(r *http.Request) bool { return true } // Allow opening the connection by HTML file
conn, err := wsWriter.upgrader.Upgrade(w, r, nil)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
wsWriter.upgrader.CheckOrigin = func(r *http.Request) bool { return true } // Allow opening the connection by HTML file
conn, err := wsWriter.upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}

wsWriter.conn = conn
defer func() {
err := wsWriter.Close()
if err != nil {
panic(err)
}
}() // Ensure the connection is closed when the function exits

for {
messageType, message, err := conn.ReadMessage()
if err != nil || messageType != websocket.TextMessage {
break
}

var signal map[string]interface{}
err = json.Unmarshal(message, &signal)
if err != nil {
panic(err)
}

fmt.Println("WebSocket connection established")

// Update the attribute of the WSWriter object with the established connection
wsWriter.conn = conn
// go routine for incoming messages
go func() {
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
panic(err)
}
}(conn) // Ensure the connection is closed when the function exits

for {
_, message, err := conn.ReadMessage()
if err != nil {
break
}

var signal map[string]interface{}
err = json.Unmarshal(message, &signal)
if err != nil {
continue
}
wsWriter.HandleClientSignal(signal)
}
}()
})
signalType, typeOk := signal["Type"].(string)
signalValue, valueOk := signal["Value"].(string)
if !typeOk || !valueOk {
panic(fmt.Sprintf("Invalid signal format: Type or Value key missing or not a string in %+v", signal))
}

// Check if the signal is a 'close' command
if signalType == "close" && signalValue == "" {
break
}

wsWriter.HandleClientSignal(signal)
}
})

// Create an Async go routine that waits for the connection
go func() {
server := &http.Server{
Addr: port,
Handler: nil,
Expand All @@ -191,21 +194,9 @@ func newWSWriter(port string) *WSWriter {
}

err := server.ListenAndServe()
if err != nil {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
select {}
}()
return wsWriter
}

// writerFactory creates and returns a WebSocket-based event writer
// It determines the port for the WebSocket server based on the given nodeID and a base port value
func writerFactory(_ string, nodeID t.NodeID, _ logging.Logger) (eventlog.EventWriter, error) {
ownPort, err := strconv.Atoi(string(nodeID))
if err != nil {
panic(err)
}
ownPort += BasePort
return newWSWriter(fmt.Sprintf(":%d", ownPort)), nil
}
39 changes: 30 additions & 9 deletions samples/pingpong/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"context"
"flag"
"fmt"
"github.com/filecoin-project/mir/pkg/debugger"

Check failure on line 7 in samples/pingpong/main.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed with -local github.com/filecoin-project/mir (goimports)
"os"
"time"

"github.com/filecoin-project/mir"
"github.com/filecoin-project/mir/pkg/debugger"
"github.com/filecoin-project/mir/pkg/eventlog"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
Expand All @@ -26,16 +27,36 @@ func main() {
"1": {"1", "/ip4/127.0.0.1/tcp/10001", nil, "1"}, // nolint:govet
}}

// Get own ID from command line.
ownID := t.NodeID(os.Args[1])
// Get 0 for no debugger, 1 for debugger
debugg := os.Args[2]

debug := flag.Bool("d", false, "Enable debug mode")
debugPort := flag.String("port", "", "Debug port number")
flag.Parse()
var ownID t.NodeID
var interceptor *eventlog.Recorder
var err error
interceptor, _ = debugger.InterceptorInit(debugg == "1", ownID) //debugger.InterceptorInit(debugger == "1", ownID)

// Instantiate network trnasport module and establish connections.
if *debug {
// In debug mode, expect the next argument to be the node ID
if flag.NArg() > 0 {
ownID = t.NodeID(flag.Arg(0))
} else {
fmt.Println("Node ID must be provided in debug mode")
os.Exit(1)
}
interceptor, err = debugger.InterceptorInit(ownID, *debugPort) // replace ownID with port number
if err != nil {
panic(err)
}
} else {
// If not in debug mode, use the first argument as the node ID
if len(os.Args) > 1 {
ownID = t.NodeID(os.Args[1])
} else {
fmt.Println("Node ID must be provided")
os.Exit(1)
}
}

// Instantiate network transport module and establish connections.
transport, err := grpc.NewTransport(ownID, membership.Nodes[ownID].Addr, logging.ConsoleWarnLogger)
if err != nil {
panic(err)
Expand Down Expand Up @@ -67,7 +88,7 @@ func main() {
nodeError <- node.Run(context.Background())
}()
fmt.Println("Mir node running.")
time.Sleep(5 * time.Second)
time.Sleep(50 * time.Second)

// Stop the node.
node.Stop()
Expand Down

0 comments on commit b0ee2f0

Please sign in to comment.