diff --git a/api/tbcapi/tbcapi.go b/api/tbcapi/tbcapi.go index d4eb57ae1..995942f9e 100644 --- a/api/tbcapi/tbcapi.go +++ b/api/tbcapi/tbcapi.go @@ -70,6 +70,12 @@ const ( CmdBlockInsertRawRequest = "tbcapi-block-insert-raw-request" CmdBlockInsertRawResponse = "tbcapi-block-insert-raw-response" + + CmdBlockDownloadAsyncRequest = "tbcapi-block-download-async-request" + CmdBlockDownloadAsyncResponse = "tbcapi-block-download-async-response" + + CmdBlockDownloadAsyncRawRequest = "tbcapi-block-download-async-raw-request" + CmdBlockDownloadAsyncRawResponse = "tbcapi-block-download-async-raw-response" ) var ( @@ -283,6 +289,31 @@ type BlockInsertRawResponse struct { Error *protocol.Error `json:"error,omitempty"` } +// BlockDownloadAsyncResponse returns a block if it exists or attempts to +// download the block from p2p asynchronously. +type BlockDownloadAsyncRequest struct { + Hash *chainhash.Hash `json:"hash"` + Peers uint `json:"peers"` +} + +// BlockDownloadAsyncResponse replies with a block, an error or nothing. When +// bot Error and Block are nil it measn the block download request was issued +// to p2p. +type BlockDownloadAsyncResponse struct { + Block *wire.MsgBlock `json:"block,omitempty"` + Error *protocol.Error `json:"error,omitempty"` +} + +type BlockDownloadAsyncRawRequest struct { + Hash *chainhash.Hash `json:"hash"` + Peers uint `json:"peers"` +} + +type BlockDownloadAsyncRawResponse struct { + Block api.ByteSlice `json:"block,omitempty"` + Error *protocol.Error `json:"error,omitempty"` +} + var commands = map[protocol.Command]reflect.Type{ CmdPingRequest: reflect.TypeOf(PingRequest{}), CmdPingResponse: reflect.TypeOf(PingResponse{}), @@ -316,6 +347,10 @@ var commands = map[protocol.Command]reflect.Type{ CmdBlockInsertResponse: reflect.TypeOf(BlockInsertResponse{}), CmdBlockInsertRawRequest: reflect.TypeOf(BlockInsertRawRequest{}), CmdBlockInsertRawResponse: reflect.TypeOf(BlockInsertRawResponse{}), + CmdBlockDownloadAsyncRequest: reflect.TypeOf(BlockDownloadAsyncRequest{}), + CmdBlockDownloadAsyncResponse: reflect.TypeOf(BlockDownloadAsyncResponse{}), + CmdBlockDownloadAsyncRawRequest: reflect.TypeOf(BlockDownloadAsyncRawRequest{}), + CmdBlockDownloadAsyncRawResponse: reflect.TypeOf(BlockDownloadAsyncRawResponse{}), } type tbcAPI struct{} diff --git a/cmd/hemictl/hemictl.go b/cmd/hemictl/hemictl.go index 987265ac7..779df1ead 100644 --- a/cmd/hemictl/hemictl.go +++ b/cmd/hemictl/hemictl.go @@ -1153,7 +1153,7 @@ func _main() error { default: return fmt.Errorf("can't derive URL from command: %v", cmd) } - conn, err := protocol.NewConn(u, nil) + conn, err := protocol.NewConn(u, &protocol.ConnOptions{ReadLimit: 8 * 1024 * 1024}) if err != nil { return err } diff --git a/service/tbc/rpc.go b/service/tbc/rpc.go index 2d897dae1..b39de928e 100644 --- a/service/tbc/rpc.go +++ b/service/tbc/rpc.go @@ -176,6 +176,20 @@ func (s *Server) handleWebsocketRead(ctx context.Context, ws *tbcWs) { return s.handleBlockInsertRawRequest(ctx, req) } + go s.handleRequest(ctx, ws, id, cmd, handler) + case tbcapi.CmdBlockDownloadAsyncRequest: + handler := func(ctx context.Context) (any, error) { + req := payload.(*tbcapi.BlockDownloadAsyncRequest) + return s.handleBlockDownloadAsyncRequest(ctx, req) + } + + go s.handleRequest(ctx, ws, id, cmd, handler) + case tbcapi.CmdBlockDownloadAsyncRawRequest: + handler := func(ctx context.Context) (any, error) { + req := payload.(*tbcapi.BlockDownloadAsyncRawRequest) + return s.handleBlockDownloadAsyncRawRequest(ctx, req) + } + go s.handleRequest(ctx, ws, id, cmd, handler) default: err = fmt.Errorf("unknown command: %v", cmd) @@ -613,6 +627,64 @@ func (s *Server) handleBlockInsertRawRequest(ctx context.Context, req *tbcapi.Bl return &tbcapi.BlockInsertRawResponse{BlockHash: &hash}, nil } +func (s *Server) handleBlockDownloadAsyncRequest(ctx context.Context, req *tbcapi.BlockDownloadAsyncRequest) (any, error) { + log.Tracef("handleBlockAsyncDownloadRequest") + defer log.Tracef("handleBlockAsyncDownloadRequest exit") + + if req.Hash == nil { + return &tbcapi.BlockDownloadAsyncResponse{ + Error: protocol.RequestErrorf("no hash"), + }, nil + } + if req.Peers > 5 { + return &tbcapi.BlockDownloadAsyncResponse{ + Error: protocol.RequestErrorf("too many peers"), + }, nil + } + + blk, err := s.DownloadBlockFromRandomPeers(ctx, req.Hash, req.Peers) + if err != nil { + e := protocol.NewInternalError(err) + return &tbcapi.BlockDownloadAsyncRawResponse{Error: e.ProtocolError()}, e + } + if blk == nil { + return &tbcapi.BlockDownloadAsyncResponse{}, nil + } + return &tbcapi.BlockDownloadAsyncResponse{Block: blk.MsgBlock()}, nil +} + +func (s *Server) handleBlockDownloadAsyncRawRequest(ctx context.Context, req *tbcapi.BlockDownloadAsyncRawRequest) (any, error) { + log.Tracef("handleBlockDownloadAsyncRawRequest") + defer log.Tracef("handleBlockDownloadAsyncRawRequest exit") + + if req.Hash == nil { + return &tbcapi.BlockDownloadAsyncRawResponse{ + Error: protocol.RequestErrorf("no hash"), + }, nil + } + if req.Peers > 5 { + return &tbcapi.BlockDownloadAsyncRawResponse{ + Error: protocol.RequestErrorf("too many peers"), + }, nil + } + + blk, err := s.DownloadBlockFromRandomPeers(ctx, req.Hash, req.Peers) + if err != nil { + e := protocol.NewInternalError(err) + return &tbcapi.BlockDownloadAsyncRawResponse{Error: e.ProtocolError()}, e + } + if blk == nil { + return &tbcapi.BlockDownloadAsyncRawResponse{}, nil + } + + rb, err := blk.Bytes() + if err != nil { + e := protocol.NewInternalError(err) + return &tbcapi.BlockDownloadAsyncRawResponse{Error: e.ProtocolError()}, e + } + return &tbcapi.BlockDownloadAsyncRawResponse{Block: rb}, nil +} + func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { log.Tracef("handleWebsocket: %v", r.RemoteAddr) defer log.Tracef("handleWebsocket exit: %v", r.RemoteAddr) diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 593f8d7c1..a1e80c2f1 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -758,6 +758,28 @@ func (s *Server) downloadBlockFromRandomPeer(ctx context.Context, block *chainha return nil } +func (s *Server) DownloadBlockFromRandomPeers(ctx context.Context, block *chainhash.Hash, count uint) (*btcutil.Block, error) { + log.Tracef("DownloadBlockFromRandomPeers %v %v", count, block) + defer log.Tracef("DownloadBlockFromRandomPeers %v %v exit", count, block) + + blk, err := s.db.BlockByHash(ctx, block) + if err != nil { + if errors.Is(err, database.ErrBlockNotFound) { + for range count { + err := s.downloadBlockFromRandomPeer(ctx, block) + if err != nil { + log.Errorf("async download: %v", err) + continue + } + } + return nil, nil + } + return nil, err + } + + return blk, nil +} + func (s *Server) handleBlockExpired(ctx context.Context, key any, value any) error { log.Tracef("handleBlockExpired") defer log.Tracef("handleBlockExpired exit") @@ -1115,6 +1137,10 @@ func (s *Server) handleHeaders(ctx context.Context, p *rawpeer.RawPeer, msg *wir return nil } +func (s *Server) BlockInsert(ctx context.Context, blk *wire.MsgBlock) (int64, error) { + return s.db.BlockInsert(ctx, btcutil.NewBlock(blk)) +} + func (s *Server) handleBlock(ctx context.Context, p *rawpeer.RawPeer, msg *wire.MsgBlock, raw []byte) error { log.Tracef("handleBlock (%v)", p) defer log.Tracef("handleBlock exit (%v)", p)