Skip to content

Commit

Permalink
tbcapi, tbcd: expose async block download, add BlockInsert (#327)
Browse files Browse the repository at this point in the history
Co-authored-by: Joshua Sing <[email protected]>
  • Loading branch information
marcopeereboom and joshuasing authored Nov 25, 2024
1 parent 543d661 commit c492171
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 1 deletion.
35 changes: 35 additions & 0 deletions api/tbcapi/tbcapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ const (

CmdBlockInsertRawRequest = "tbcapi-block-insert-raw-request"
CmdBlockInsertRawResponse = "tbcapi-block-insert-raw-response"

CmdBlockDownloadAsyncRequest = "tbcapi-block-download-async-request"
CmdBlockDownloadAsyncResponse = "tbcapi-block-download-async-response"

CmdBlockDownloadAsyncRawRequest = "tbcapi-block-download-async-raw-request"
CmdBlockDownloadAsyncRawResponse = "tbcapi-block-download-async-raw-response"
)

var (
Expand Down Expand Up @@ -283,6 +289,31 @@ type BlockInsertRawResponse struct {
Error *protocol.Error `json:"error,omitempty"`
}

// BlockDownloadAsyncResponse returns a block if it exists or attempts to
// download the block from p2p asynchronously.
type BlockDownloadAsyncRequest struct {
Hash *chainhash.Hash `json:"hash"`
Peers uint `json:"peers"`
}

// BlockDownloadAsyncResponse replies with a block, an error or nothing. When
// bot Error and Block are nil it measn the block download request was issued
// to p2p.
type BlockDownloadAsyncResponse struct {
Block *wire.MsgBlock `json:"block,omitempty"`
Error *protocol.Error `json:"error,omitempty"`
}

type BlockDownloadAsyncRawRequest struct {
Hash *chainhash.Hash `json:"hash"`
Peers uint `json:"peers"`
}

type BlockDownloadAsyncRawResponse struct {
Block api.ByteSlice `json:"block,omitempty"`
Error *protocol.Error `json:"error,omitempty"`
}

var commands = map[protocol.Command]reflect.Type{
CmdPingRequest: reflect.TypeOf(PingRequest{}),
CmdPingResponse: reflect.TypeOf(PingResponse{}),
Expand Down Expand Up @@ -316,6 +347,10 @@ var commands = map[protocol.Command]reflect.Type{
CmdBlockInsertResponse: reflect.TypeOf(BlockInsertResponse{}),
CmdBlockInsertRawRequest: reflect.TypeOf(BlockInsertRawRequest{}),
CmdBlockInsertRawResponse: reflect.TypeOf(BlockInsertRawResponse{}),
CmdBlockDownloadAsyncRequest: reflect.TypeOf(BlockDownloadAsyncRequest{}),
CmdBlockDownloadAsyncResponse: reflect.TypeOf(BlockDownloadAsyncResponse{}),
CmdBlockDownloadAsyncRawRequest: reflect.TypeOf(BlockDownloadAsyncRawRequest{}),
CmdBlockDownloadAsyncRawResponse: reflect.TypeOf(BlockDownloadAsyncRawResponse{}),
}

type tbcAPI struct{}
Expand Down
6 changes: 5 additions & 1 deletion cmd/hemictl/hemictl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
const (
daemonName = "hemictl"
defaultLogLevel = daemonName + "=INFO;bfgpostgres=INFO;postgres=INFO;protocol=INFO"

tbcReadLimit = 8 * (1 << 20) // 8 MiB.
)

var (
Expand Down Expand Up @@ -1153,7 +1155,9 @@ func _main() error {
default:
return fmt.Errorf("can't derive URL from command: %v", cmd)
}
conn, err := protocol.NewConn(u, nil)
conn, err := protocol.NewConn(u, &protocol.ConnOptions{
ReadLimit: tbcReadLimit,
})
if err != nil {
return err
}
Expand Down
76 changes: 76 additions & 0 deletions service/tbc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ func (s *Server) handleWebsocketRead(ctx context.Context, ws *tbcWs) {
return s.handleBlockInsertRawRequest(ctx, req)
}

go s.handleRequest(ctx, ws, id, cmd, handler)
case tbcapi.CmdBlockDownloadAsyncRequest:
handler := func(ctx context.Context) (any, error) {
req := payload.(*tbcapi.BlockDownloadAsyncRequest)
return s.handleBlockDownloadAsyncRequest(ctx, req)
}

go s.handleRequest(ctx, ws, id, cmd, handler)
case tbcapi.CmdBlockDownloadAsyncRawRequest:
handler := func(ctx context.Context) (any, error) {
req := payload.(*tbcapi.BlockDownloadAsyncRawRequest)
return s.handleBlockDownloadAsyncRawRequest(ctx, req)
}

go s.handleRequest(ctx, ws, id, cmd, handler)
default:
err = fmt.Errorf("unknown command: %v", cmd)
Expand Down Expand Up @@ -613,6 +627,68 @@ func (s *Server) handleBlockInsertRawRequest(ctx context.Context, req *tbcapi.Bl
return &tbcapi.BlockInsertRawResponse{BlockHash: &hash}, nil
}

// handleBlockDownloadAsyncRequest handles tbcapi.BlockDownloadAsyncRequest.
func (s *Server) handleBlockDownloadAsyncRequest(ctx context.Context, req *tbcapi.BlockDownloadAsyncRequest) (any, error) {
log.Tracef("handleBlockAsyncDownloadRequest")
defer log.Tracef("handleBlockAsyncDownloadRequest exit")

if req.Hash == nil {
return &tbcapi.BlockDownloadAsyncResponse{
Error: protocol.RequestErrorf("hash must be provided"),
}, nil
}
if req.Peers <= 0 || req.Peers > 5 {
return &tbcapi.BlockDownloadAsyncResponse{
Error: protocol.RequestErrorf("invalid peers"),
}, nil
}

blk, err := s.DownloadBlockFromRandomPeers(ctx, req.Hash, req.Peers)
if err != nil {
e := protocol.NewInternalError(err)
return &tbcapi.BlockDownloadAsyncRawResponse{Error: e.ProtocolError()}, e
}
if blk == nil {
// Block will be downloaded in the background, asynchronously.
return &tbcapi.BlockDownloadAsyncResponse{}, nil
}
return &tbcapi.BlockDownloadAsyncResponse{Block: blk.MsgBlock()}, nil
}

// handleBlockDownloadAsyncRawRequest handles tbcapi.BlockDownloadAsyncRawRequest.
func (s *Server) handleBlockDownloadAsyncRawRequest(ctx context.Context, req *tbcapi.BlockDownloadAsyncRawRequest) (any, error) {
log.Tracef("handleBlockDownloadAsyncRawRequest")
defer log.Tracef("handleBlockDownloadAsyncRawRequest exit")

if req.Hash == nil {
return &tbcapi.BlockDownloadAsyncRawResponse{
Error: protocol.RequestErrorf("hash must be provided"),
}, nil
}
if req.Peers <= 0 || req.Peers > 5 {
return &tbcapi.BlockDownloadAsyncRawResponse{
Error: protocol.RequestErrorf("too many peers"),
}, nil
}

blk, err := s.DownloadBlockFromRandomPeers(ctx, req.Hash, req.Peers)
if err != nil {
e := protocol.NewInternalError(err)
return &tbcapi.BlockDownloadAsyncRawResponse{Error: e.ProtocolError()}, e
}
if blk == nil {
// Block will be downloaded in the background, asynchronously.
return &tbcapi.BlockDownloadAsyncRawResponse{}, nil
}

rb, err := blk.Bytes()
if err != nil {
e := protocol.NewInternalError(err)
return &tbcapi.BlockDownloadAsyncRawResponse{Error: e.ProtocolError()}, e
}
return &tbcapi.BlockDownloadAsyncRawResponse{Block: rb}, nil
}

func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) {
log.Tracef("handleWebsocket: %v", r.RemoteAddr)
defer log.Tracef("handleWebsocket exit: %v", r.RemoteAddr)
Expand Down
26 changes: 26 additions & 0 deletions service/tbc/tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,28 @@ func (s *Server) downloadBlockFromRandomPeer(ctx context.Context, block *chainha
return nil
}

func (s *Server) DownloadBlockFromRandomPeers(ctx context.Context, block *chainhash.Hash, count uint) (*btcutil.Block, error) {
log.Tracef("DownloadBlockFromRandomPeers %v %v", count, block)
defer log.Tracef("DownloadBlockFromRandomPeers %v %v exit", count, block)

blk, err := s.db.BlockByHash(ctx, block)
if err != nil {
if errors.Is(err, database.ErrBlockNotFound) {
for range count {
err := s.downloadBlockFromRandomPeer(ctx, block)
if err != nil {
log.Errorf("async download: %v", err)
continue
}
}
return nil, nil
}
return nil, err
}

return blk, nil
}

func (s *Server) handleBlockExpired(ctx context.Context, key any, value any) error {
log.Tracef("handleBlockExpired")
defer log.Tracef("handleBlockExpired exit")
Expand Down Expand Up @@ -1115,6 +1137,10 @@ func (s *Server) handleHeaders(ctx context.Context, p *rawpeer.RawPeer, msg *wir
return nil
}

func (s *Server) BlockInsert(ctx context.Context, blk *wire.MsgBlock) (int64, error) {
return s.db.BlockInsert(ctx, btcutil.NewBlock(blk))
}

func (s *Server) handleBlock(ctx context.Context, p *rawpeer.RawPeer, msg *wire.MsgBlock, raw []byte) error {
log.Tracef("handleBlock (%v)", p)
defer log.Tracef("handleBlock exit (%v)", p)
Expand Down

0 comments on commit c492171

Please sign in to comment.