From 7d0364aaaa175e64c9e2d329a4e837b8c4d65735 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 8 Dec 2023 16:21:33 +0100 Subject: [PATCH] proof+tapcfg: add and use new MultiArchiveNotifier This commit creates a new notifier that can relay registrations for new proofs to multiple notifier/archiver backends. We then use that to pass in both the local assets store as well as the multiverse store to the custodian to detect incoming proofs. --- proof/archive.go | 83 ++++++++++++++++++++++++++++++++++++++++++++++++ tapcfg/server.go | 4 ++- 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/proof/archive.go b/proof/archive.go index ff02c5b24..8afcb85d9 100644 --- a/proof/archive.go +++ b/proof/archive.go @@ -140,6 +140,89 @@ type NotifyArchiver interface { fn.EventPublisher[Blob, []*Locator] } +// MultiArchiveNotifier is a NotifyArchiver that wraps several other archives +// and notifies subscribers about new proofs that are added to any of the +// archives. +type MultiArchiveNotifier struct { + archives []NotifyArchiver +} + +// NewMultiArchiveNotifier creates a new MultiArchiveNotifier based on the set +// of specified backends. +func NewMultiArchiveNotifier(archives ...NotifyArchiver) *MultiArchiveNotifier { + return &MultiArchiveNotifier{ + archives: archives, + } +} + +// FetchProof fetches a proof for an asset uniquely identified by the passed +// Identifier. The returned proof can either be a full proof file or just a +// single proof. +// +// If a proof cannot be found, then ErrProofNotFound should be returned. +// +// NOTE: This is part of the NotifyArchiver interface. +func (m *MultiArchiveNotifier) FetchProof(ctx context.Context, + id Locator) (Blob, error) { + + for idx := range m.archives { + a := m.archives[idx] + + proofBlob, err := a.FetchProof(ctx, id) + if errors.Is(err, ErrProofNotFound) { + // Try the next archive. + continue + } else if err != nil { + return nil, fmt.Errorf("error fetching proof "+ + "from archive: %w", err) + } + + return proofBlob, nil + } + + return nil, ErrProofNotFound +} + +// RegisterSubscriber adds a new subscriber for receiving events. The +// registration request is forwarded to all registered archives. +func (m *MultiArchiveNotifier) RegisterSubscriber( + receiver *fn.EventReceiver[Blob], deliverExisting bool, + deliverFrom []*Locator) error { + + for idx := range m.archives { + a := m.archives[idx] + + err := a.RegisterSubscriber( + receiver, deliverExisting, deliverFrom, + ) + if err != nil { + return fmt.Errorf("error registering subscriber: %w", + err) + } + } + + return nil +} + +// RemoveSubscriber removes the given subscriber and also stops it from +// processing events. The removal request is forwarded to all registered +// archives. +func (m *MultiArchiveNotifier) RemoveSubscriber( + subscriber *fn.EventReceiver[Blob]) error { + + for idx := range m.archives { + a := m.archives[idx] + + err := a.RemoveSubscriber(subscriber) + if err != nil { + return fmt.Errorf("error removing subscriber: "+ + "%w", err) + } + } + + return nil +} + // FileArchiver implements proof Archiver backed by an on-disk file system. The // archiver takes a single root directory then creates the following overlap // mapping: diff --git a/tapcfg/server.go b/tapcfg/server.go index 37786b96b..9d842bff9 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -330,6 +330,8 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, ChainParams: &tapChainParams, }) + multiNotifier := proof.NewMultiArchiveNotifier(assetStore, multiverse) + return &tap.Config{ DebugLevel: cfg.DebugLevel, RuntimeID: runtimeID, @@ -364,7 +366,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, ), AddrBook: addrBook, ProofArchive: proofArchive, - ProofNotifier: assetStore, + ProofNotifier: multiNotifier, ErrChan: mainErrChan, ProofCourierCfg: proofCourierCfg, ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay,