diff --git a/app/grpc_server/main.go b/app/grpc_server/main.go index 8a179fc..c5e2a9d 100644 --- a/app/grpc_server/main.go +++ b/app/grpc_server/main.go @@ -17,13 +17,13 @@ import ( const prefix = "QUBIC_API_SIDECAR" func main() { - log := log.New(os.Stdout, prefix, log.LstdFlags|log.Lmicroseconds|log.Lshortfile) - if err := run(log); err != nil { - log.Fatalf("main: exited with error: %s", err.Error()) + logger := log.New(os.Stdout, prefix, log.LstdFlags|log.Lmicroseconds|log.Lshortfile) + if err := run(logger); err != nil { + logger.Fatalf("main: exited with error: %s", err.Error()) } } -func run(log *log.Logger) error { +func run(logger *log.Logger) error { var cfg struct { Server struct { ReadTimeout time.Duration `conf:"default:5s"` @@ -68,7 +68,7 @@ func run(log *log.Logger) error { if err != nil { return errors.Wrap(err, "generating config for output") } - log.Printf("main: Config :\n%v\n", out) + logger.Printf("main: Config :\n%v\n", out) pool, err := qubic.NewPoolConnection(qubic.PoolConfig{ InitialCap: cfg.Pool.InitialCap, @@ -83,8 +83,11 @@ func run(log *log.Logger) error { return errors.Wrap(err, "creating qubic pool") } - rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, pool, cfg.Server.MaxTickFetchUrl) - rpcServer.Start() + rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, logger, pool, cfg.Server.MaxTickFetchUrl) + err = rpcServer.Start() + if err != nil { + return errors.Wrap(err, "starting rpc server") + } shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) diff --git a/foundation/rpc_server/rpc_server.go b/foundation/rpc_server/rpc_server.go index 75f1405..9614aad 100644 --- a/foundation/rpc_server/rpc_server.go +++ b/foundation/rpc_server/rpc_server.go @@ -5,27 +5,23 @@ import ( "context" "encoding/base64" "encoding/json" - "fmt" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/pkg/errors" "github.com/qubic/go-node-connector/types" "github.com/qubic/go-schnorrq" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/emptypb" "io" + "net" qubic "github.com/qubic/go-node-connector" "github.com/qubic/qubic-http/protobuff" - "google.golang.org/grpc" - - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/reflection" - - "google.golang.org/protobuf/encoding/protojson" - "log" - "net" "net/http" ) @@ -33,16 +29,18 @@ var _ protobuff.QubicLiveServiceServer = &Server{} type Server struct { protobuff.UnimplementedQubicLiveServiceServer + logger *log.Logger listenAddrGRPC string listenAddrHTTP string qPool *qubic.Pool maxTickFetchUrl string } -func NewServer(listenAddrGRPC, listenAddrHTTP string, qPool *qubic.Pool, maxTickFetchUrl string) *Server { +func NewServer(listenAddrGRPC, listenAddrHTTP string, logger *log.Logger, qPool *qubic.Pool, maxTickFetchUrl string) *Server { return &Server{ listenAddrGRPC: listenAddrGRPC, listenAddrHTTP: listenAddrHTTP, + logger: logger, qPool: qPool, maxTickFetchUrl: maxTickFetchUrl, } @@ -174,12 +172,12 @@ func (s *Server) BroadcastTransaction(ctx context.Context, req *protobuff.Broadc } maxTick, err := fetchMaxTick(ctx, s.maxTickFetchUrl) - if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - if transaction.Tick < maxTick { + offsetTick := transaction.Tick - maxTick + if offsetTick <= 0 { return nil, status.Errorf(codes.InvalidArgument, "Target tick: %d for the transaction should be greater than max tick: %d", transaction.Tick, maxTick) } @@ -188,10 +186,26 @@ func (s *Server) BroadcastTransaction(ctx context.Context, req *protobuff.Broadc return nil, status.Error(codes.Internal, err.Error()) } - fmt.Printf("Transaction: %s | Target tick: %d | Max tick: %d\n", transactionId, transaction.Tick, maxTick) + var sourceID types.Identity + sourceID, err = sourceID.FromPubKey(transaction.SourcePublicKey, false) + if err != nil { + return nil, status.Error(codes.Internal, errors.Wrap(err, "getting source ID").Error()) + } + + var destID types.Identity + destID, err = destID.FromPubKey(transaction.DestinationPublicKey, false) + if err != nil { + return nil, status.Error(codes.Internal, errors.Wrap(err, "getting dest ID").Error()) + } + + peersBroadcasted := broadcastTxToMultiple(ctx, s.qPool, decodedTx) + s.logger.Printf("Tx ID: %s | Source: %s | Dest: %s | Target tick: %d | Max tick: %d | Offset tick: %d | Peers broadcasted: %d\n", transactionId, sourceID, destID, transaction.Tick, maxTick, offsetTick, peersBroadcasted) + if peersBroadcasted == 0 { + return nil, status.Error(codes.Internal, "tx wasn't broadcast to any peers, please retry") + } return &protobuff.BroadcastTransactionResponse{ - PeersBroadcasted: int32(broadcastTxToMultiple(ctx, s.qPool, decodedTx)), + PeersBroadcasted: int32(peersBroadcasted), EncodedTransaction: req.EncodedTransaction, TransactionId: transactionId, }, nil @@ -219,56 +233,6 @@ func broadcastTxToMultiple(ctx context.Context, pool *qubic.Pool, decodedTx []by return nrSuccess } -func (s *Server) Start() error { - srv := grpc.NewServer( - grpc.MaxRecvMsgSize(600*1024*1024), - grpc.MaxSendMsgSize(600*1024*1024), - ) - protobuff.RegisterQubicLiveServiceServer(srv, s) - reflection.Register(srv) - - lis, err := net.Listen("tcp", s.listenAddrGRPC) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - - go func() { - if err := srv.Serve(lis); err != nil { - panic(err) - } - }() - - if s.listenAddrHTTP != "" { - go func() { - mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{ - MarshalOptions: protojson.MarshalOptions{EmitDefaultValues: true, EmitUnpopulated: false}, - })) - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(600*1024*1024), - grpc.MaxCallSendMsgSize(600*1024*1024), - ), - } - - if err := protobuff.RegisterQubicLiveServiceHandlerFromEndpoint( - context.Background(), - mux, - s.listenAddrGRPC, - opts, - ); err != nil { - panic(err) - } - - if err := http.ListenAndServe(s.listenAddrHTTP, mux); err != nil { - panic(err) - } - }() - } - - return nil -} - func int8ArrayToString(array []int8) string { runes := make([]rune, 0) @@ -489,3 +453,53 @@ func (s *Server) GetPossessedAssets(ctx context.Context, req *protobuff.Possesse return &protobuff.PossessedAssetsResponse{PossessedAssets: possessedAssets}, nil } + +func (s *Server) Start() error { + srv := grpc.NewServer( + grpc.MaxRecvMsgSize(600*1024*1024), + grpc.MaxSendMsgSize(600*1024*1024), + ) + protobuff.RegisterQubicLiveServiceServer(srv, s) + reflection.Register(srv) + + lis, err := net.Listen("tcp", s.listenAddrGRPC) + if err != nil { + return errors.Wrap(err, "listening gRPC") + } + + go func() { + if err := srv.Serve(lis); err != nil { + panic(err) + } + }() + + if s.listenAddrHTTP != "" { + go func() { + mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{ + MarshalOptions: protojson.MarshalOptions{EmitDefaultValues: true, EmitUnpopulated: false}, + })) + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(600*1024*1024), + grpc.MaxCallSendMsgSize(600*1024*1024), + ), + } + + if err := protobuff.RegisterQubicLiveServiceHandlerFromEndpoint( + context.Background(), + mux, + s.listenAddrGRPC, + opts, + ); err != nil { + panic(err) + } + + if err := http.ListenAndServe(s.listenAddrHTTP, mux); err != nil { + panic(err) + } + }() + } + + return nil +}