Skip to content

Commit

Permalink
multi: allow proof courier to short cut with local archive
Browse files Browse the repository at this point in the history
With this commit we allow the universe RPC courier to short cut the
iterative proof retrieval process if we arrive at a proof in the chain
that we already have in the local archive. And since the local archive
only stores full chains of proofs, once we get one from it, the full
retrieval process is complete.
  • Loading branch information
guggero committed Jan 9, 2024
1 parent 864b554 commit 45a8a65
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 21 deletions.
97 changes: 77 additions & 20 deletions proof/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type CourierCfg struct {
// TransferLog is a log for recording proof delivery and retrieval
// attempts.
TransferLog TransferLog

// LocalArchive is an archive that can be used to fetch proofs from the
// local archive.
LocalArchive Archiver
}

// CourierDispatch is an interface that abstracts away the different proof
Expand Down Expand Up @@ -123,18 +127,14 @@ func (u *URLDispatch) NewCourier(addr *url.URL,
cfg.BackoffCfg, u.cfg.TransferLog,
)

hashMailCfg := HashMailCourierCfg{
ReceiverAckTimeout: cfg.ReceiverAckTimeout,
}

hashMailBox, err := NewHashMailBox(addr)
if err != nil {
return nil, fmt.Errorf("unable to make mailbox: %v",
err)
}

return &HashMailCourier{
cfg: &hashMailCfg,
cfg: u.cfg,
backoffHandle: backoffHandler,
recipient: recipient,
mailbox: hashMailBox,
Expand Down Expand Up @@ -165,7 +165,7 @@ func (u *URLDispatch) NewCourier(addr *url.URL,
recipient: recipient,
client: client,
backoffHandle: backoffHandler,
transfer: u.cfg.TransferLog,
cfg: u.cfg,
subscribers: subscribers,
rawConn: conn,
}, nil
Expand Down Expand Up @@ -593,9 +593,9 @@ func (b *BackoffHandler) initialDelay(ctx context.Context,
// Exec attempts to execute the given proof transfer function using a repeating
// backoff time delayed strategy. The backoff strategy is used to ensure that we
// don't spam the courier service with proof transfer attempts.
func (b *BackoffHandler) Exec(ctx context.Context,
proofLocator Locator, transferType TransferType,
transferFunc func() error, subscriberEvent func(fn.Event)) error {
func (b *BackoffHandler) Exec(ctx context.Context, proofLocator Locator,
transferType TransferType, transferFunc func() error,
subscriberEvent func(fn.Event)) error {

if b.cfg == nil {
return fmt.Errorf("backoff config not specified")
Expand Down Expand Up @@ -721,8 +721,8 @@ type HashMailCourierCfg struct {
// HashMailCourier is a hashmail proof courier service handle. It implements the
// Courier interface.
type HashMailCourier struct {
// cfg contains the courier's configuration parameters.
cfg *HashMailCourierCfg
// cfg is the general courier configuration.
cfg *CourierCfg

// backoffHandle is a handle to the backoff procedure used in proof
// delivery.
Expand Down Expand Up @@ -786,10 +786,10 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
// Wait to receive the ACK from the remote party over
// their stream.
log.Infof("Waiting (%v) for receiver ACK via sid=%x",
h.cfg.ReceiverAckTimeout, receiverStreamID)
h.cfg.HashMailCfg.ReceiverAckTimeout, receiverStreamID)

ctxTimeout, cancel := context.WithTimeout(
ctx, h.cfg.ReceiverAckTimeout,
ctx, h.cfg.HashMailCfg.ReceiverAckTimeout,
)
defer cancel()
err = h.mailbox.RecvAck(ctxTimeout, receiverStreamID)
Expand Down Expand Up @@ -1008,6 +1008,9 @@ type UniverseRpcCourier struct {
// the universe RPC server.
client unirpc.UniverseClient

// cfg is the general courier configuration.
cfg *CourierCfg

// rawConn is the raw connection that the courier will use to interact
// with the remote gRPC service.
rawConn *grpc.ClientConn
Expand All @@ -1016,10 +1019,6 @@ type UniverseRpcCourier struct {
// delivery.
backoffHandle *BackoffHandler

// transfer is the log that the courier will use to record the
// attempted delivery of proofs to the receiver.
transfer TransferLog

// subscribers is a map of components that want to be notified on new
// events, keyed by their subscription ID.
subscribers map[uint64]*fn.EventReceiver[fn.Event]
Expand Down Expand Up @@ -1188,7 +1187,9 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
return proofBlob, nil
}

proofFile, err := FetchProofProvenance(ctx, originLocator, fetchProof)
proofFile, err := FetchProofProvenance(
ctx, c.cfg.LocalArchive, originLocator, fetchProof,
)
if err != nil {
return nil, fmt.Errorf("error fetching proof provenance: %w",
err)
Expand Down Expand Up @@ -1270,14 +1271,19 @@ type TransferLog interface {
// FetchProofProvenance iterates backwards through the main chain of proofs
// until it reaches the genesis point (the asset is the genesis asset) and then
// returns the full proof file with the full provenance for the asset.
func FetchProofProvenance(ctx context.Context, originLocator Locator,
func FetchProofProvenance(ctx context.Context, localArchive Archiver,
originLocator Locator,
fetchSingleProof func(context.Context, Locator) (Blob, error)) (*File,
error) {

// In order to reconstruct the proof file we must collect all the
// transition proofs that make up the main chain of proofs. That is
// accomplished by iterating backwards through the main chain of proofs
// until we reach the genesis point (minting proof).
// until we reach the genesis point (minting proof). The local archive
// can be a proof file based archive or a single proof based archive. If
// we do get a full proof file from the local archive, we do have a
// shortcut to the full remaining provenance, and we can stop the
// iterative lookup.

// We will update the locator at each iteration.
currentLocator := originLocator
Expand All @@ -1287,6 +1293,57 @@ func FetchProofProvenance(ctx context.Context, originLocator Locator,
// is a reversal of that found in the proof file.
var reversedProofs []Blob
for {
// Before we attempt to fetch the proof from the potentially
// remote universe, we'll check our local archive to see if we
// already have it. This doesn't make sense in all cases, so we
// allow the local archive to be nil, in which case we just skip
// this step. The local universe might give us a single proof or
// a full file, depending on its type. If we get a full file, we
// are already done.
var (
fullProof Blob
err error
)
if localArchive != nil {
fullProof, err = localArchive.FetchProof(
ctx, currentLocator,
)
}

// If we don't have a local archive, then the IsFile() check
// returns false, and we skip this block.
if err == nil && fullProof.IsFile() {
// The file in the local archive contains the full
// provenance from the current proof back to the
// genesis.
proofFile, err := fullProof.AsFile()
if err != nil {
return nil, fmt.Errorf("unable to decode "+
"proof file: %w", err)
}

// So if this is the first proof we've fetched, we're
// already done.
if len(reversedProofs) == 0 {
return proofFile, nil
}

// If we have already fetched some proofs, we'll need to
// append them to the proof file we just fetched.
for i := len(reversedProofs) - 1; i >= 0; i-- {
err := proofFile.AppendProofRaw(
reversedProofs[i],
)
if err != nil {
return nil, fmt.Errorf("error "+
"appending proof to proof "+
"file: %w", err)
}
}

return proofFile, nil
}

// Setup proof receive/query routine and start backoff
// procedure.
proofBlob, err := fetchSingleProof(ctx, currentLocator)
Expand Down
1 change: 1 addition & 0 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
HashMailCfg: cfg.HashMailCourier,
UniverseRpcCfg: cfg.UniverseRpcCourier,
TransferLog: assetStore,
LocalArchive: proofArchive,
})

multiNotifier := proof.NewMultiArchiveNotifier(assetStore, multiverse)
Expand Down
4 changes: 3 additions & 1 deletion tapdb/multiverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,9 @@ func (b *MultiverseStore) FetchProof(ctx context.Context,
return proofs[0].Leaf.RawProof, nil
}

file, err := proof.FetchProofProvenance(ctx, originLocator, fetchProof)
file, err := proof.FetchProofProvenance(
ctx, nil, originLocator, fetchProof,
)
if err != nil {
return nil, fmt.Errorf("error fetching proof from archive: %w",
err)
Expand Down

0 comments on commit 45a8a65

Please sign in to comment.