Skip to content

Commit

Permalink
feat: use new go-trustless-utils package
Browse files Browse the repository at this point in the history
Also includes a change of "path does not exist in DAG" handling introduced in
#16 from an error to a log message.
Ultimately it's up to the user what they do with the data and we can only
provide them with the DAG that proves that the path is not resolvable.
  • Loading branch information
rvagg committed Sep 2, 2023
1 parent df07453 commit 6e4f116
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 185 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/go-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,3 @@ concurrency:
jobs:
go-test:
uses: pl-strflt/uci/.github/workflows/[email protected]
with:
go-versions: '["this"]'
109 changes: 27 additions & 82 deletions carstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (
"io"

// codecs we care about
"github.com/filecoin-project/lassie/pkg/types"
dagpb "github.com/ipld/go-codec-dagpb"

_ "github.com/ipld/go-ipld-prime/codec/cbor"
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
_ "github.com/ipld/go-ipld-prime/codec/json"
_ "github.com/ipld/go-ipld-prime/codec/raw"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-trustless-utils/traversal"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-unixfsnode"
Expand All @@ -23,110 +22,56 @@ import (
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
trustlessutils "github.com/ipld/go-trustless-utils"
)

var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser)

// StreamCar streams a DAG in CARv1 format to the given writer, using the given
// selector.
func StreamCar(
ctx context.Context,
requestLsys linking.LinkSystem,
rootCid cid.Cid,
path datamodel.Path,
dagScope types.DagScope,
dagScope trustlessutils.DagScope,
out io.Writer,
duplicates bool,
) error {

selNode := unixfsnode.UnixFSPathSelectorBuilder(path.String(), dagScope.TerminalSelectorSpec(), false)
sel, err := selector.CompileSelector(selNode)
if err != nil {
return fmt.Errorf("failed to compile selector: %w", err)
}

carWriter, err := carstorage.NewWritable(out, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(duplicates))
if err != nil {
return fmt.Errorf("failed to create car writer: %w", err)
}

erro := &errorRecordingReadOpener{ctx, requestLsys.StorageReadOpener, carWriter, nil}
requestLsys.StorageReadOpener = erro.StorageReadOpener
requestLsys.StorageReadOpener = carPipe(requestLsys.StorageReadOpener, carWriter)

rootNode, err := loadNode(ctx, rootCid, requestLsys)
selNode := unixfsnode.UnixFSPathSelectorBuilder(path.String(), dagScope.TerminalSelectorSpec(), false)

cfg := traversal.Config{Root: rootCid, Selector: selNode}
lastPath, err := cfg.Traverse(ctx, requestLsys, nil)
if err != nil {
return fmt.Errorf("failed to load root node: %w", err)
return err
}

progress := traversal.Progress{Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: requestLsys,
LinkTargetNodePrototypeChooser: protoChooser,
}}
var lastPath datamodel.Path
visitor := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error {
lastPath = p.Path
return nil
}
if err := progress.WalkAdv(rootNode, sel, visitor); err != nil {
return fmt.Errorf("failed to complete traversal: %w", err)
if err := traversal.CheckPath(path, lastPath); err != nil {
logger.Warnf("failed to traverse full requested path: %s", err)
}
if erro.err != nil {
return fmt.Errorf("block load failed during traversal: %w", erro.err)
}
for path.Len() > 0 {
if lastPath.Len() == 0 {
return fmt.Errorf("failed to traverse full path, missed: [%s]", path.String())
}
var seg, lastSeg datamodel.PathSegment
seg, path = path.Shift()
lastSeg, lastPath = lastPath.Shift()
if seg != lastSeg {
return fmt.Errorf("unexpected path segment visit, got [%s], expected [%s]", lastSeg.String(), seg.String())
}
}
// having lastPath.Len()>0 is fine, it may be due to an "all" or "entity"
// doing an explore-all on the remainder of the DAG after the path.

return nil
}

type errorRecordingReadOpener struct {
ctx context.Context
orig linking.BlockReadOpener
car carstorage.WritableCar
err error
}

func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
r, err := erro.orig(lc, lnk)
if err != nil {
erro.err = err
return nil, err
}
byts, err := io.ReadAll(r)
if err != nil {
return nil, err
}
err = erro.car.Put(erro.ctx, lnk.(cidlink.Link).Cid.KeyString(), byts)
if err != nil {
return nil, err
}
return bytes.NewReader(byts), nil
}

func loadNode(ctx context.Context, rootCid cid.Cid, lsys linking.LinkSystem) (datamodel.Node, error) {
lnk := cidlink.Link{Cid: rootCid}
lnkCtx := linking.LinkContext{Ctx: ctx}
proto, err := protoChooser(lnk, lnkCtx)
if err != nil {
return nil, fmt.Errorf("failed to choose prototype for CID %s: %w", rootCid.String(), err)
}
rootNode, err := lsys.Load(lnkCtx, lnk, proto)
if err != nil {
return nil, fmt.Errorf("failed to load root CID: %w", err)
func carPipe(orig linking.BlockReadOpener, car carstorage.WritableCar) linking.BlockReadOpener {
return func(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
r, err := orig(lc, lnk)
if err != nil {
return nil, err
}
byts, err := io.ReadAll(r)
if err != nil {
return nil, err
}
err = car.Put(lc.Ctx, lnk.(cidlink.Link).Cid.KeyString(), byts)
if err != nil {
return nil, err
}
return bytes.NewReader(byts), nil
}
return rootNode, nil
}
34 changes: 21 additions & 13 deletions carstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"testing"

"github.com/filecoin-project/lassie/pkg/types"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
gstestutil "github.com/ipfs/go-graphsync/testutil"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/memstore"
trustlessutils "github.com/ipld/go-trustless-utils"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -53,15 +53,15 @@ func TestStreamCar(t *testing.T) {
testCases := []struct {
name string
path datamodel.Path
scope types.DagScope
scope trustlessutils.DagScope
root cid.Cid
lsys linking.LinkSystem
validate func(t *testing.T, r io.Reader)
expectedErr string
}{
{
name: "chain: all blocks",
scope: types.DagScopeAll,
scope: trustlessutils.DagScopeAll,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
validate: func(t *testing.T, r io.Reader) {
Expand All @@ -72,7 +72,7 @@ func TestStreamCar(t *testing.T) {
},
{
name: "chain: just root",
scope: types.DagScopeBlock,
scope: trustlessutils.DagScopeBlock,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
validate: func(t *testing.T, r io.Reader) {
Expand All @@ -83,7 +83,7 @@ func TestStreamCar(t *testing.T) {
},
{
name: "unixfs file",
scope: types.DagScopeAll,
scope: trustlessutils.DagScopeAll,
root: fileEnt.Root,
lsys: fileLsys,
validate: func(t *testing.T, r io.Reader) {
Expand All @@ -94,7 +94,7 @@ func TestStreamCar(t *testing.T) {
},
{
name: "unixfs directory",
scope: types.DagScopeAll,
scope: trustlessutils.DagScopeAll,
root: dirEnt.Root,
lsys: dirLsys,
validate: func(t *testing.T, r io.Reader) {
Expand All @@ -105,7 +105,7 @@ func TestStreamCar(t *testing.T) {
},
{
name: "unixfs sharded directory",
scope: types.DagScopeAll,
scope: trustlessutils.DagScopeAll,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
validate: func(t *testing.T, r io.Reader) {
Expand All @@ -115,12 +115,20 @@ func TestStreamCar(t *testing.T) {
},
},
{
name: "unixfs sharded directory, error no such path",
scope: types.DagScopeAll,
path: datamodel.ParsePath(shardedDirEnt.Children[0].Path + "/nope"),
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
expectedErr: "failed to traverse full path, missed: [nope]",
// path that (probably) doesn't exist, shouldn't error but shouldn't
// return much (this ought to be tested better with a fixture)
name: "unixfs sharded directory, no such path",
scope: trustlessutils.DagScopeAll,
path: datamodel.ParsePath(shardedDirEnt.Children[0].Path + "/nope"),
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, shardedDirEnt.Root, root)
cids := blkCids(blks)
require.Contains(t, cids, shardedDirEnt.Root)
require.NotEqual(t, len(entCids(shardedDirEnt)), cids) // shouldn't contain the full thing!
},
},
}

Expand Down
22 changes: 5 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ go 1.20

require (
github.com/dustin/go-humanize v1.0.1
github.com/filecoin-project/lassie v0.17.0
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-graphsync v0.14.7
github.com/ipfs/go-graphsync v0.14.8
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-unixfsnode v1.8.0
github.com/ipld/go-car/v2 v2.11.0
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipld/go-trustless-utils v0.0.0
github.com/ipni/go-libipni v0.3.4
github.com/ipni/index-provider v0.13.5
github.com/libp2p/go-libp2p v0.30.0
Expand All @@ -25,28 +24,24 @@ require (
)

require (
github.com/Jorropo/jsync v1.0.1 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bep/debounce v1.2.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/filecoin-project/go-address v1.1.0 // indirect
github.com/filecoin-project/go-amt-ipld/v4 v4.2.0 // indirect
github.com/filecoin-project/go-cbor-util v0.0.1 // indirect
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 // indirect
github.com/filecoin-project/go-ds-versioning v0.1.2 // indirect
github.com/filecoin-project/go-hamt-ipld/v3 v3.3.0 // indirect
github.com/filecoin-project/go-retrieval-types v1.2.0 // indirect
github.com/filecoin-project/go-state-types v0.13.0 // indirect
github.com/filecoin-project/go-statemachine v1.0.3 // indirect
github.com/filecoin-project/go-statestore v0.2.0 // indirect
Expand All @@ -59,7 +54,6 @@ require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand All @@ -74,19 +68,17 @@ require (
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.6 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/boxo v0.12.0 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-ipfs-blocksutil v0.0.1 // indirect
github.com/ipfs/go-ipfs-chunker v0.0.5 // indirect
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-cbor v0.1.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/go-unixfs v0.4.5 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/ipld/go-ipld-adl-hamt v0.0.0-20230814133645-9c9b7f7d771d // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect
Expand All @@ -102,8 +94,6 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-gostream v0.6.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.1 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
Expand All @@ -115,9 +105,7 @@ require (
github.com/miekg/dns v1.1.55 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/go-server-timing v1.0.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
Expand Down
Loading

0 comments on commit 6e4f116

Please sign in to comment.