Skip to content

Commit

Permalink
address feedback from PR #3
Browse files Browse the repository at this point in the history
  • Loading branch information
cortze committed May 22, 2023
1 parent dc26716 commit a5a3e9b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
12 changes: 6 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,12 @@ var eventDefs = map[EventType]EventDef{
}
peerID, err := getPeerIDFromTraceBytes(ev.PeerID)
if err != nil {
logger.Debug("error", err, "peer_id", ev.PeerID)
logger.Debug("skipping event, bad peer id", "error", err, "peer_id", ev.PeerID)
}

otherPeerID, err := getPeerIDFromTraceBytes(sub.PeerID)
if err != nil {
logger.Debug("error", err, "peer_id", sub.PeerID)
logger.Debug("skipping event, bad remote peer id", "error", err, "peer_id", sub.PeerID)
}

values := make([]any, 0, len(parentCols)+len(sub.Topics)*len(childCols))
Expand Down Expand Up @@ -837,7 +837,7 @@ var eventDefs = map[EventType]EventDef{
childColsLen = 2 // (rpc_event_id, message_id)
}
case UntraceableMessage:
logger.Warn("received unsupported SendRPC event", ev)
logger.Warn("received unsupported SendRPC event", "msg_type", ev.SendRPC.Meta)
continue
}
// append parents values
Expand Down Expand Up @@ -923,12 +923,12 @@ var eventDefs = map[EventType]EventDef{
// Get Parent info
peerID, err := peer.IDFromBytes([]byte(ev.PeerID))
if err != nil {
logger.Debug("skipping event, bad peer id", err, "peer_id", ev.PeerID)
logger.Debug("skipping event, bad peer id", "error", err, "peer_id", ev.PeerID)
continue
}
remotePeerId, err := peer.IDFromBytes([]byte(ev.RecvRPC.ReceivedFrom))
if err != nil {
logger.Debug("skipping event, bad peer id", err, "peer_id", ev.PeerID)
logger.Debug("skipping event, bad peer id", "error", err, "peer_id", ev.RecvRPC.ReceivedFrom)
continue
}

Expand Down Expand Up @@ -969,7 +969,7 @@ var eventDefs = map[EventType]EventDef{
childColsLen = 2 // (rpc_event_id, message_id)
}
case UntraceableMessage:
logger.Warn("received unsupported RecvRPC event", ev)
logger.Warn("received unsupported RecvRPC event", "msg_type", ev.RecvRPC.Meta)
continue
}

Expand Down
18 changes: 9 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,16 @@ func (s *Server) BulkTraceHandler(w http.ResponseWriter, r *http.Request) {
return
} else {
reqDec := json.NewDecoder(bytes.NewReader(reqBuf.Bytes()))
decoderLoop:
for reqDec.More() {
var tracEv TraceEvent
decErr := reqDec.Decode(&tracEv)
if decErr == io.EOF {
break decoderLoop
break
} else if decErr != nil {
total++
failed++
slog.Error("decoding ES trace:", decErr)
continue decoderLoop
continue
} else {
total++
if tracEv.Type != nil {
Expand All @@ -90,21 +89,22 @@ func (s *Server) BulkTraceHandler(w http.ResponseWriter, r *http.Request) {
} else {
empty++
}
continue decoderLoop
continue
}
}
slog.Info(fmt.Sprintf("received bulk resp: total %d suc %d, emtpy %d, failed %d",
total,
successful,
empty,
failed),
slog.Debug("received bulk request",
"total", total,
"successful", successful,
"empty", empty,
"failed", failed,
)
// compose reply
finT := time.Since(iniT)
bulkResp.Took = int(finT.Seconds())
jres, err := json.Marshal(bulkResp)
if err != nil {
slog.Error("unable to compose bulk response", err)
w.WriteHeader(http.StatusBadRequest)
}
w.Write(jres)
}
Expand Down

0 comments on commit a5a3e9b

Please sign in to comment.