diff --git a/db.go b/db.go index 9fbfb41..6851342 100644 --- a/db.go +++ b/db.go @@ -730,12 +730,12 @@ var eventDefs = map[EventType]EventDef{ } peerID, err := getPeerIDFromTraceBytes(ev.PeerID) if err != nil { - logger.Debug("error", err, "peer_id", ev.PeerID) + logger.Debug("skipping event, bad peer id", "error", err, "peer_id", ev.PeerID) } otherPeerID, err := getPeerIDFromTraceBytes(sub.PeerID) if err != nil { - logger.Debug("error", err, "peer_id", sub.PeerID) + logger.Debug("skipping event, bad remote peer id", "error", err, "peer_id", sub.PeerID) } values := make([]any, 0, len(parentCols)+len(sub.Topics)*len(childCols)) @@ -837,7 +837,7 @@ var eventDefs = map[EventType]EventDef{ childColsLen = 2 // (rpc_event_id, message_id) } case UntraceableMessage: - logger.Warn("received unsupported SendRPC event", ev) + logger.Warn("received unsupported SendRPC event", "msg_type", ev.SendRPC.Meta) continue } // append parents values @@ -923,12 +923,12 @@ var eventDefs = map[EventType]EventDef{ // Get Parent info peerID, err := peer.IDFromBytes([]byte(ev.PeerID)) if err != nil { - logger.Debug("skipping event, bad peer id", err, "peer_id", ev.PeerID) + logger.Debug("skipping event, bad peer id", "error", err, "peer_id", ev.PeerID) continue } remotePeerId, err := peer.IDFromBytes([]byte(ev.RecvRPC.ReceivedFrom)) if err != nil { - logger.Debug("skipping event, bad peer id", err, "peer_id", ev.PeerID) + logger.Debug("skipping event, bad peer id", "error", err, "peer_id", ev.RecvRPC.ReceivedFrom) continue } @@ -969,7 +969,7 @@ var eventDefs = map[EventType]EventDef{ childColsLen = 2 // (rpc_event_id, message_id) } case UntraceableMessage: - logger.Warn("received unsupported RecvRPC event", ev) + logger.Warn("received unsupported RecvRPC event", "msg_type", ev.RecvRPC.Meta) continue } diff --git a/server.go b/server.go index 70392b5..7e3f7ef 100644 --- a/server.go +++ b/server.go @@ -71,17 +71,16 @@ func (s *Server) BulkTraceHandler(w http.ResponseWriter, r *http.Request) { return } else { reqDec := json.NewDecoder(bytes.NewReader(reqBuf.Bytes())) - decoderLoop: for reqDec.More() { var tracEv TraceEvent decErr := reqDec.Decode(&tracEv) if decErr == io.EOF { - break decoderLoop + break } else if decErr != nil { total++ failed++ slog.Error("decoding ES trace:", decErr) - continue decoderLoop + continue } else { total++ if tracEv.Type != nil { @@ -90,14 +89,14 @@ func (s *Server) BulkTraceHandler(w http.ResponseWriter, r *http.Request) { } else { empty++ } - continue decoderLoop + continue } } - slog.Info(fmt.Sprintf("received bulk resp: total %d suc %d, emtpy %d, failed %d", - total, - successful, - empty, - failed), + slog.Debug("received bulk request", + "total", total, + "successful", successful, + "empty", empty, + "failed", failed, ) // compose reply finT := time.Since(iniT) @@ -105,6 +104,7 @@ func (s *Server) BulkTraceHandler(w http.ResponseWriter, r *http.Request) { jres, err := json.Marshal(bulkResp) if err != nil { slog.Error("unable to compose bulk response", err) + w.WriteHeader(http.StatusBadRequest) } w.Write(jres) }