diff --git a/thead/ipns/ipns.go b/thead/ipns/ipns.go index b7e3f88..5e9df20 100644 --- a/thead/ipns/ipns.go +++ b/thead/ipns/ipns.go @@ -1,2 +1,105 @@ // Package ipns provides implementation for pdcl head management based on ipns protocol. package ipns + +import ( + "context" + "fmt" + "path" + "time" + + "github.com/ipfs/go-cid" + shell "github.com/ipfs/go-ipfs-api" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multibase" + + "github.com/areknoster/public-distributed-commit-log/thead" +) + +var _ thead.Reader = (*BasicHeadReader)(nil) + +// BasicHeadReader get's topic head from given IPNS address +type BasicHeadReader struct { + sh *shell.Shell + resolver resolver + ipnsAddr string +} + +func NewBasicHeadReader(sh *shell.Shell, ipnsAddr string) *BasicHeadReader { + bhr := &BasicHeadReader{ + sh: sh, + resolver: newShellResolver(sh), + ipnsAddr: ipnsAddr, + } + bhr.ipnsAddr = ipnsAddr + return bhr +} + +func (hr *BasicHeadReader) ReadHead(ctx context.Context) (cid.Cid, error) { + id, err := hr.resolver.resolveIPNS(hr.ipnsAddr) + if err != nil { + return cid.Cid{}, err + } + return id, nil +} + +// BasicHeadSetter sets default IPFS daemon's IPNS key address to point to given CID +type BasicHeadSetter struct { + sh *shell.Shell +} + +func NewBasicHeadSetter(sh *shell.Shell) *BasicHeadSetter { + return &BasicHeadSetter{sh: sh} +} + +func (bhs *BasicHeadSetter) SetHead(ctx context.Context, cid cid.Cid) error { + ipfsAddr := path.Join("/ipfs/", cid.String()) + _, err := bhs.sh.PublishWithDetails(ipfsAddr, "", 24*time.Hour, 10*time.Minute, false) + if err != nil { + return fmt.Errorf("publishing ipns update to ipfs daemon: %v", err) + } + return nil +} + +// BasicHeadManager can be used by Sentinel to manage topic head +// Since consumers don't set topic's head, it should not be used by them. +// Use HeadReader implementation to get topic head for consumer +// and some other (e.g. memory or disk) implementation to store internal consumer offset +type BasicHeadManager struct { + *BasicHeadReader + *BasicHeadSetter +} + +// NewBasicHeadManager initializes BasicHeadManager with default daemon's key used as PK for topic's head +func NewBasicHeadManager(sh *shell.Shell) (BasicHeadManager, error) { + ipnsAddr, err := GetDaemonIPNSAddress(sh) + if err != nil { + return BasicHeadManager{}, fmt.Errorf("get daemon IPNS address: %w", err) + } + return BasicHeadManager{ + BasicHeadReader: NewBasicHeadReader(sh, ipnsAddr), + BasicHeadSetter: NewBasicHeadSetter(sh), + }, nil +} + +// GetDaemonIPNSAddress gets IPNS address attached to daemon's default key. +// In most scenarios it's to be used when initializing +// sentinel with some existing daemon to use its default +// ipns address as IPNS head. +func GetDaemonIPNSAddress(sh *shell.Shell) (string, error) { + // this implementation is extremely non-obvious + // because IPFS doesn't normally allow for finding IPNS address + // of given key unless some file is added to it. + resp, err := sh.ID() + if err != nil { + return "", fmt.Errorf("get IPFS ID: %w", err) + } + pid, err := peer.Decode(resp.ID) + if err != nil { + return "", fmt.Errorf("decode peer ID: %w", err) + } + ipnsAddr, err := peer.ToCid(pid).StringOfBase(multibase.Base36) + if err != nil { + return "", fmt.Errorf("encode ipns address: %w", err) + } + return ipnsAddr, nil +} diff --git a/thead/ipns/resolver.go b/thead/ipns/resolver.go new file mode 100644 index 0000000..97fd989 --- /dev/null +++ b/thead/ipns/resolver.go @@ -0,0 +1,36 @@ +package ipns + +import ( + "fmt" + "path" + "strings" + + "github.com/areknoster/public-distributed-commit-log/pdcl" + "github.com/ipfs/go-cid" + shell "github.com/ipfs/go-ipfs-api" +) + +type resolver interface { + // resolveIPNS finds IPFS address that's pointed by given IPNS address + resolveIPNS(ipnsName string) (ipfsAddress cid.Cid, err error) +} + +type shellResolver struct { + shell *shell.Shell +} + +func newShellResolver(shell *shell.Shell) *shellResolver { + return &shellResolver{shell: shell} +} + +func (m *shellResolver) resolveIPNS(ipnsName string) (ipfsAddress cid.Cid, err error) { + resolvedAddr, err := m.shell.Resolve(path.Join("/ipns/", ipnsName)) + if err != nil { + return cid.Undef, fmt.Errorf("resolve %s from IPNS: %w", ipnsName, err) + } + resolvedCid, err := pdcl.ParseCID(strings.TrimPrefix(resolvedAddr, "/ipfs/")) + if err != nil { + return cid.Undef, fmt.Errorf("parse resolved IPNS address to CID: %w", err) + } + return resolvedCid, nil +} diff --git a/thead/ipns/test/ipns_test.go b/thead/ipns/test/ipns_test.go new file mode 100644 index 0000000..1685d9a --- /dev/null +++ b/thead/ipns/test/ipns_test.go @@ -0,0 +1,42 @@ +// Package ipns provides implementation for pdcl head management based on ipns protocol. +package ipns + +import ( + "context" + "testing" + "time" + + "github.com/areknoster/public-distributed-commit-log/storage/message/ipfs" + "github.com/areknoster/public-distributed-commit-log/storage/pbcodec" + "github.com/areknoster/public-distributed-commit-log/test/testpb" + "github.com/areknoster/public-distributed-commit-log/thead" + . "github.com/areknoster/public-distributed-commit-log/thead/ipns" + shell "github.com/ipfs/go-ipfs-api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestHeadManager_WriteReadHead(t *testing.T) { + if testing.Short() { + t.Skip("skipping because ipfs daemon is needed") + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + sh := shell.NewShell("localhost:5001") + + storage := ipfs.NewStorage(sh, pbcodec.ProtoBuf{}) + msg := testpb.MakeCurrentRandomTestMessage() + id, err := storage.Write(ctx, msg) + require.NoError(t, err) + t.Log("message written") + + var headManager thead.Manager + headManager, err = NewBasicHeadManager(sh) + require.NoError(t, err) + require.NoError(t, headManager.SetHead(ctx, id)) + + headCid, err := headManager.ReadHead(ctx) + require.NoError(t, err) + assert.Equal(t, id, headCid) +}