Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/2] custodian: look up proofs in local universe as well #726

Merged
merged 11 commits into from
Jan 12, 2024
87 changes: 87 additions & 0 deletions proof/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,93 @@ type NotifyArchiver interface {
fn.EventPublisher[Blob, []*Locator]
}

// MultiArchiveNotifier is a NotifyArchiver that wraps several other archives
// and notifies subscribers about new proofs that are added to any of the
// archives.
type MultiArchiveNotifier struct {
guggero marked this conversation as resolved.
Show resolved Hide resolved
archives []NotifyArchiver
}

// NewMultiArchiveNotifier creates a new MultiArchiveNotifier based on the set
// of specified backends.
func NewMultiArchiveNotifier(archives ...NotifyArchiver) *MultiArchiveNotifier {
return &MultiArchiveNotifier{
archives: archives,
}
}

// FetchProof fetches a proof for an asset uniquely identified by the passed
// Identifier. The returned proof can either be a full proof file or just a
// single proof.
//
// If a proof cannot be found, then ErrProofNotFound should be returned.
//
// NOTE: This is part of the NotifyArchiver interface.
func (m *MultiArchiveNotifier) FetchProof(ctx context.Context,
id Locator) (Blob, error) {

for idx := range m.archives {
a := m.archives[idx]

proofBlob, err := a.FetchProof(ctx, id)
if errors.Is(err, ErrProofNotFound) {
// Try the next archive.
continue
} else if err != nil {
return nil, fmt.Errorf("error fetching proof "+
"from archive: %w", err)
}

return proofBlob, nil
}

return nil, ErrProofNotFound
}

// RegisterSubscriber adds a new subscriber for receiving events. The
// registration request is forwarded to all registered archives.
func (m *MultiArchiveNotifier) RegisterSubscriber(
receiver *fn.EventReceiver[Blob], deliverExisting bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving review note to self: does the subscriber need to be ready to handle multiple subscriptions notifications from all the archives for the same proof file?

deliverFrom []*Locator) error {

for idx := range m.archives {
a := m.archives[idx]

err := a.RegisterSubscriber(
receiver, deliverExisting, deliverFrom,
)
if err != nil {
return fmt.Errorf("error registering subscriber: %w",
err)
}
}

return nil
}

// RemoveSubscriber removes the given subscriber and also stops it from
// processing events. The removal request is forwarded to all registered
// archives.
func (m *MultiArchiveNotifier) RemoveSubscriber(
subscriber *fn.EventReceiver[Blob]) error {

for idx := range m.archives {
a := m.archives[idx]

err := a.RemoveSubscriber(subscriber)
if err != nil {
return fmt.Errorf("error removing subscriber: "+
"%w", err)
}
}

return nil
}

// A compile-time interface to ensure MultiArchiveNotifier meets the
// NotifyArchiver interface.
var _ NotifyArchiver = (*MultiArchiveNotifier)(nil)

// FileArchiver implements proof Archiver backed by an on-disk file system. The
// archiver takes a single root directory then creates the following overlap
// mapping:
Expand Down
4 changes: 3 additions & 1 deletion tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
TransferLog: assetStore,
})

multiNotifier := proof.NewMultiArchiveNotifier(assetStore, multiverse)

return &tap.Config{
DebugLevel: cfg.DebugLevel,
RuntimeID: runtimeID,
Expand Down Expand Up @@ -361,7 +363,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
),
AddrBook: addrBook,
ProofArchive: proofArchive,
ProofNotifier: assetStore,
ProofNotifier: multiNotifier,
ErrChan: mainErrChan,
ProofCourierDispatcher: proofCourierDispatcher,
ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay, ProofWatcher: reOrgWatcher,
Expand Down