Skip to content

Commit

Permalink
improved logging capabilities on broadcast transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
0xluk committed Aug 1, 2024
1 parent 1d06b2e commit 17e349a
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 71 deletions.
17 changes: 10 additions & 7 deletions app/grpc_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
const prefix = "QUBIC_API_SIDECAR"

func main() {
log := log.New(os.Stdout, prefix, log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
if err := run(log); err != nil {
log.Fatalf("main: exited with error: %s", err.Error())
logger := log.New(os.Stdout, prefix, log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
if err := run(logger); err != nil {
logger.Fatalf("main: exited with error: %s", err.Error())
}
}

func run(log *log.Logger) error {
func run(logger *log.Logger) error {
var cfg struct {
Server struct {
ReadTimeout time.Duration `conf:"default:5s"`
Expand Down Expand Up @@ -68,7 +68,7 @@ func run(log *log.Logger) error {
if err != nil {
return errors.Wrap(err, "generating config for output")
}
log.Printf("main: Config :\n%v\n", out)
logger.Printf("main: Config :\n%v\n", out)

pool, err := qubic.NewPoolConnection(qubic.PoolConfig{
InitialCap: cfg.Pool.InitialCap,
Expand All @@ -83,8 +83,11 @@ func run(log *log.Logger) error {
return errors.Wrap(err, "creating qubic pool")
}

rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, pool, cfg.Server.MaxTickFetchUrl)
rpcServer.Start()
rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, logger, pool, cfg.Server.MaxTickFetchUrl)
err = rpcServer.Start()
if err != nil {
return errors.Wrap(err, "starting rpc server")
}

shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
Expand Down
142 changes: 78 additions & 64 deletions foundation/rpc_server/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,42 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/pkg/errors"
"github.com/qubic/go-node-connector/types"
"github.com/qubic/go-schnorrq"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/emptypb"
"io"
"net"

qubic "github.com/qubic/go-node-connector"
"github.com/qubic/qubic-http/protobuff"
"google.golang.org/grpc"

"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"

"google.golang.org/protobuf/encoding/protojson"

"log"
"net"
"net/http"
)

var _ protobuff.QubicLiveServiceServer = &Server{}

type Server struct {
protobuff.UnimplementedQubicLiveServiceServer
logger *log.Logger
listenAddrGRPC string
listenAddrHTTP string
qPool *qubic.Pool
maxTickFetchUrl string
}

func NewServer(listenAddrGRPC, listenAddrHTTP string, qPool *qubic.Pool, maxTickFetchUrl string) *Server {
func NewServer(listenAddrGRPC, listenAddrHTTP string, logger *log.Logger, qPool *qubic.Pool, maxTickFetchUrl string) *Server {
return &Server{
listenAddrGRPC: listenAddrGRPC,
listenAddrHTTP: listenAddrHTTP,
logger: logger,
qPool: qPool,
maxTickFetchUrl: maxTickFetchUrl,
}
Expand Down Expand Up @@ -174,12 +172,12 @@ func (s *Server) BroadcastTransaction(ctx context.Context, req *protobuff.Broadc
}

maxTick, err := fetchMaxTick(ctx, s.maxTickFetchUrl)

if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if transaction.Tick < maxTick {
offsetTick := transaction.Tick - maxTick
if offsetTick <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "Target tick: %d for the transaction should be greater than max tick: %d", transaction.Tick, maxTick)
}

Expand All @@ -188,10 +186,26 @@ func (s *Server) BroadcastTransaction(ctx context.Context, req *protobuff.Broadc
return nil, status.Error(codes.Internal, err.Error())
}

fmt.Printf("Transaction: %s | Target tick: %d | Max tick: %d\n", transactionId, transaction.Tick, maxTick)
var sourceID types.Identity
sourceID, err = sourceID.FromPubKey(transaction.SourcePublicKey, false)
if err != nil {
return nil, status.Error(codes.Internal, errors.Wrap(err, "getting source ID").Error())
}

var destID types.Identity
destID, err = destID.FromPubKey(transaction.DestinationPublicKey, false)
if err != nil {
return nil, status.Error(codes.Internal, errors.Wrap(err, "getting dest ID").Error())
}

peersBroadcasted := broadcastTxToMultiple(ctx, s.qPool, decodedTx)
s.logger.Printf("Tx ID: %s | Source: %s | Dest: %s | Target tick: %d | Max tick: %d | Offset tick: %d | Peers broadcasted: %d\n", transactionId, sourceID, destID, transaction.Tick, maxTick, offsetTick, peersBroadcasted)
if peersBroadcasted == 0 {
return nil, status.Error(codes.Internal, "tx wasn't broadcast to any peers, please retry")
}

return &protobuff.BroadcastTransactionResponse{
PeersBroadcasted: int32(broadcastTxToMultiple(ctx, s.qPool, decodedTx)),
PeersBroadcasted: int32(peersBroadcasted),
EncodedTransaction: req.EncodedTransaction,
TransactionId: transactionId,
}, nil
Expand Down Expand Up @@ -219,56 +233,6 @@ func broadcastTxToMultiple(ctx context.Context, pool *qubic.Pool, decodedTx []by
return nrSuccess
}

func (s *Server) Start() error {
srv := grpc.NewServer(
grpc.MaxRecvMsgSize(600*1024*1024),
grpc.MaxSendMsgSize(600*1024*1024),
)
protobuff.RegisterQubicLiveServiceServer(srv, s)
reflection.Register(srv)

lis, err := net.Listen("tcp", s.listenAddrGRPC)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

go func() {
if err := srv.Serve(lis); err != nil {
panic(err)
}
}()

if s.listenAddrHTTP != "" {
go func() {
mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{EmitDefaultValues: true, EmitUnpopulated: false},
}))
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(600*1024*1024),
grpc.MaxCallSendMsgSize(600*1024*1024),
),
}

if err := protobuff.RegisterQubicLiveServiceHandlerFromEndpoint(
context.Background(),
mux,
s.listenAddrGRPC,
opts,
); err != nil {
panic(err)
}

if err := http.ListenAndServe(s.listenAddrHTTP, mux); err != nil {
panic(err)
}
}()
}

return nil
}

func int8ArrayToString(array []int8) string {
runes := make([]rune, 0)

Expand Down Expand Up @@ -489,3 +453,53 @@ func (s *Server) GetPossessedAssets(ctx context.Context, req *protobuff.Possesse

return &protobuff.PossessedAssetsResponse{PossessedAssets: possessedAssets}, nil
}

func (s *Server) Start() error {
srv := grpc.NewServer(
grpc.MaxRecvMsgSize(600*1024*1024),
grpc.MaxSendMsgSize(600*1024*1024),
)
protobuff.RegisterQubicLiveServiceServer(srv, s)
reflection.Register(srv)

lis, err := net.Listen("tcp", s.listenAddrGRPC)
if err != nil {
return errors.Wrap(err, "listening gRPC")
}

go func() {
if err := srv.Serve(lis); err != nil {
panic(err)
}
}()

if s.listenAddrHTTP != "" {
go func() {
mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{EmitDefaultValues: true, EmitUnpopulated: false},
}))
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(600*1024*1024),
grpc.MaxCallSendMsgSize(600*1024*1024),
),
}

if err := protobuff.RegisterQubicLiveServiceHandlerFromEndpoint(
context.Background(),
mux,
s.listenAddrGRPC,
opts,
); err != nil {
panic(err)
}

if err := http.ListenAndServe(s.listenAddrHTTP, mux); err != nil {
panic(err)
}
}()
}

return nil
}

0 comments on commit 17e349a

Please sign in to comment.