Skip to content

Commit

Permalink
proof+tapcfg: add and use new MultiArchiveNotifier
Browse files Browse the repository at this point in the history
This commit creates a new notifier that can relay registrations for new
proofs to multiple notifier/archiver backends.
We then use that to pass in both the local assets store as well as the
multiverse store to the custodian to detect incoming proofs.
  • Loading branch information
guggero committed Dec 8, 2023
1 parent 8d7613c commit 7d0364a
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
83 changes: 83 additions & 0 deletions proof/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,89 @@ type NotifyArchiver interface {
fn.EventPublisher[Blob, []*Locator]
}

// MultiArchiveNotifier is a NotifyArchiver that wraps several other archives
// and notifies subscribers about new proofs that are added to any of the
// archives.
type MultiArchiveNotifier struct {
archives []NotifyArchiver
}

// NewMultiArchiveNotifier creates a new MultiArchiveNotifier based on the set
// of specified backends.
func NewMultiArchiveNotifier(archives ...NotifyArchiver) *MultiArchiveNotifier {
return &MultiArchiveNotifier{
archives: archives,
}
}

// FetchProof fetches a proof for an asset uniquely identified by the passed
// Identifier. The returned proof can either be a full proof file or just a
// single proof.
//
// If a proof cannot be found, then ErrProofNotFound should be returned.
//
// NOTE: This is part of the NotifyArchiver interface.
func (m *MultiArchiveNotifier) FetchProof(ctx context.Context,
id Locator) (Blob, error) {

for idx := range m.archives {
a := m.archives[idx]

proofBlob, err := a.FetchProof(ctx, id)
if errors.Is(err, ErrProofNotFound) {
// Try the next archive.
continue
} else if err != nil {
return nil, fmt.Errorf("error fetching proof "+
"from archive: %w", err)
}

return proofBlob, nil
}

return nil, ErrProofNotFound
}

// RegisterSubscriber adds a new subscriber for receiving events. The
// registration request is forwarded to all registered archives.
func (m *MultiArchiveNotifier) RegisterSubscriber(
receiver *fn.EventReceiver[Blob], deliverExisting bool,
deliverFrom []*Locator) error {

for idx := range m.archives {
a := m.archives[idx]

err := a.RegisterSubscriber(
receiver, deliverExisting, deliverFrom,
)
if err != nil {
return fmt.Errorf("error registering subscriber: %w",
err)
}
}

return nil
}

// RemoveSubscriber removes the given subscriber and also stops it from
// processing events. The removal request is forwarded to all registered
// archives.
func (m *MultiArchiveNotifier) RemoveSubscriber(
subscriber *fn.EventReceiver[Blob]) error {

for idx := range m.archives {
a := m.archives[idx]

err := a.RemoveSubscriber(subscriber)
if err != nil {
return fmt.Errorf("error removing subscriber: "+
"%w", err)
}
}

return nil
}

// FileArchiver implements proof Archiver backed by an on-disk file system. The
// archiver takes a single root directory then creates the following overlap
// mapping:
Expand Down
4 changes: 3 additions & 1 deletion tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
ChainParams: &tapChainParams,
})

multiNotifier := proof.NewMultiArchiveNotifier(assetStore, multiverse)

return &tap.Config{
DebugLevel: cfg.DebugLevel,
RuntimeID: runtimeID,
Expand Down Expand Up @@ -364,7 +366,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
),
AddrBook: addrBook,
ProofArchive: proofArchive,
ProofNotifier: assetStore,
ProofNotifier: multiNotifier,
ErrChan: mainErrChan,
ProofCourierCfg: proofCourierCfg,
ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay,
Expand Down

0 comments on commit 7d0364a

Please sign in to comment.