From 940dde4517c188b6b67e4f5097d1152cae7ae6b6 Mon Sep 17 00:00:00 2001
From: gammazero <11790789+gammazero@users.noreply.github.com>
Date: Sun, 29 Sep 2024 03:02:13 -0700
Subject: [PATCH 1/6] Return errors using channles and not embedded in result
type
Asynchronous functions should return errors over channels instead of embedding the error in the result type.
Closes #9974
---
client/rpc/pin.go | 46 +++++++++---------
client/rpc/unixfs.go | 47 +++++++++----------
core/commands/ls.go | 11 ++---
core/commands/pin/pin.go | 12 +----
core/commands/pin/remotepin.go | 35 +++++++-------
core/coreapi/pin.go | 48 +++++++++++--------
core/coreapi/unixfs.go | 86 ++++++++++++++++++++++++----------
core/coreiface/pin.go | 5 +-
core/coreiface/tests/block.go | 7 ++-
core/coreiface/tests/pin.go | 18 +++----
core/coreiface/tests/unixfs.go | 52 +++++++++++---------
core/coreiface/unixfs.go | 51 ++++++++++++++++++--
12 files changed, 248 insertions(+), 170 deletions(-)
diff --git a/client/rpc/pin.go b/client/rpc/pin.go
index 6e8e942ac0e..eb2aa316cf1 100644
--- a/client/rpc/pin.go
+++ b/client/rpc/pin.go
@@ -62,10 +62,16 @@ type pinLsObject struct {
Type string
}
-func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) {
+func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, <-chan error) {
+ pins := make(chan iface.Pin)
+ errOut := make(chan error, 1)
+
options, err := caopts.PinLsOptions(opts...)
if err != nil {
- return nil, err
+ errOut <- err
+ close(pins)
+ close(errOut)
+ return pins, errOut
}
res, err := api.core().Request("pin/ls").
@@ -73,38 +79,32 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan i
Option("stream", true).
Send(ctx)
if err != nil {
- return nil, err
+ errOut <- err
+ close(pins)
+ close(errOut)
+ return pins, errOut
}
- pins := make(chan iface.Pin)
- go func(ch chan<- iface.Pin) {
+ go func(ch chan<- iface.Pin, errCh chan<- error) {
defer res.Output.Close()
defer close(ch)
+ defer close(errCh)
dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
- switch err := dec.Decode(&out); err {
- case nil:
- case io.EOF:
- return
- default:
- select {
- case ch <- pin{err: err}:
- return
- case <-ctx.Done():
- return
+ err := dec.Decode(&out)
+ if err != nil {
+ if err != io.EOF {
+ errCh <- err
}
+ return
}
c, err := cid.Parse(out.Cid)
if err != nil {
- select {
- case ch <- pin{err: err}:
- return
- case <-ctx.Done():
- return
- }
+ errCh <- err
+ return
}
select {
@@ -113,8 +113,8 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan i
return
}
}
- }(pins)
- return pins, nil
+ }(pins, errOut)
+ return pins, errOut
}
// IsPinned returns whether or not the given cid is pinned
diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go
index 3ba2c1c15f0..c913db67a8f 100644
--- a/client/rpc/unixfs.go
+++ b/client/rpc/unixfs.go
@@ -144,10 +144,16 @@ type lsOutput struct {
Objects []lsObject
}
-func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) {
+func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, <-chan error) {
+ out := make(chan iface.DirEntry)
+ errOut := make(chan error, 1)
+
options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
- return nil, err
+ errOut <- err
+ close(out)
+ close(errOut)
+ return out, errOut
}
resp, err := api.core().Request("ls", p.String()).
@@ -156,45 +162,41 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
Option("stream", true).
Send(ctx)
if err != nil {
- return nil, err
+ errOut <- err
+ close(out)
+ close(errOut)
+ return out, errOut
}
if resp.Error != nil {
- return nil, resp.Error
+ errOut <- resp.Error
+ close(out)
+ close(errOut)
+ return out, errOut
}
dec := json.NewDecoder(resp.Output)
- out := make(chan iface.DirEntry)
go func() {
defer resp.Close()
defer close(out)
+ defer close(errOut)
for {
var link lsOutput
if err := dec.Decode(&link); err != nil {
- if err == io.EOF {
- return
- }
- select {
- case out <- iface.DirEntry{Err: err}:
- case <-ctx.Done():
+ if err != io.EOF {
+ errOut <- err
}
return
}
if len(link.Objects) != 1 {
- select {
- case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}:
- case <-ctx.Done():
- }
+ errOut <- errors.New("unexpected Objects len")
return
}
if len(link.Objects[0].Links) != 1 {
- select {
- case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}:
- case <-ctx.Done():
- }
+ errOut <- errors.New("unexpected Links len")
return
}
@@ -202,10 +204,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
c, err := cid.Decode(l0.Hash)
if err != nil {
- select {
- case out <- iface.DirEntry{Err: err}:
- case <-ctx.Done():
- }
+ errOut <- err
return
}
@@ -235,7 +234,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
}
}()
- return out, nil
+ return out, errOut
}
func (api *UnixfsAPI) core() *HttpApi {
diff --git a/core/commands/ls.go b/core/commands/ls.go
index ab914bb0e15..15db7ff53d7 100644
--- a/core/commands/ls.go
+++ b/core/commands/ls.go
@@ -139,17 +139,11 @@ The JSON output contains type information.
return err
}
- results, err := api.Unixfs().Ls(req.Context, pth,
+ results, errCh := api.Unixfs().Ls(req.Context, pth,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
- if err != nil {
- return err
- }
processLink, dirDone = processDir()
for link := range results {
- if link.Err != nil {
- return link.Err
- }
var ftype unixfs_pb.Data_DataType
switch link.Type {
case iface.TFile:
@@ -174,6 +168,9 @@ The JSON output contains type information.
return err
}
}
+ if err = <-errCh; err != nil {
+ return err
+ }
dirDone(i)
}
return done()
diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go
index b87760aaff4..bd9c43a8326 100644
--- a/core/commands/pin/pin.go
+++ b/core/commands/pin/pin.go
@@ -557,15 +557,8 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}
- pins, err := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
- if err != nil {
- return err
- }
-
+ pins, errCh := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
for p := range pins {
- if err := p.Err(); err != nil {
- return err
- }
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
@@ -577,8 +570,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}
-
- return nil
+ return <-errCh
}
const (
diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go
index 3721913e77c..40cb515c5f3 100644
--- a/core/commands/pin/remotepin.go
+++ b/core/commands/pin/remotepin.go
@@ -285,19 +285,15 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
cmds.DelimitedStringsOption(",", pinStatusOptionName, "Return pins with the specified statuses (queued,pinning,pinned,failed).").WithDefault([]string{"pinned"}),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
- ctx, cancel := context.WithCancel(req.Context)
- defer cancel()
-
c, err := getRemotePinServiceFromRequest(req, env)
if err != nil {
return err
}
- psCh, errCh, err := lsRemote(ctx, req, c)
- if err != nil {
- return err
- }
+ ctx, cancel := context.WithCancel(req.Context)
+ defer cancel()
+ psCh, errCh := lsRemote(ctx, req, c)
for ps := range psCh {
if err := res.Emit(toRemotePinOutput(ps)); err != nil {
return err
@@ -317,7 +313,7 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
}
// Executes GET /pins/?query-with-filters
-func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan pinclient.PinStatusGetter, chan error, error) {
+func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-chan pinclient.PinStatusGetter, <-chan error) {
opts := []pinclient.LsOption{}
if name, nameFound := req.Options[pinNameOptionName]; nameFound {
nameStr := name.(string)
@@ -330,7 +326,12 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan
for _, rawCID := range cidsRawArr {
parsedCID, err := cid.Decode(rawCID)
if err != nil {
- return nil, nil, fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
+ psCh := make(chan pinclient.PinStatusGetter)
+ errCh := make(chan error, 1)
+ errCh <- fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
+ close(psCh)
+ close(errCh)
+ return psCh, errCh
}
parsedCIDs = append(parsedCIDs, parsedCID)
}
@@ -342,16 +343,19 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan
for _, rawStatus := range statusRawArr {
s := pinclient.Status(rawStatus)
if s.String() == string(pinclient.StatusUnknown) {
- return nil, nil, fmt.Errorf("status %q is not valid", rawStatus)
+ psCh := make(chan pinclient.PinStatusGetter)
+ errCh := make(chan error, 1)
+ errCh <- fmt.Errorf("status %q is not valid", rawStatus)
+ close(psCh)
+ close(errCh)
+ return psCh, errCh
}
parsedStatuses = append(parsedStatuses, s)
}
opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...))
}
- psCh, errCh := c.Ls(ctx, opts...)
-
- return psCh, errCh, nil
+ return c.Ls(ctx, opts...)
}
var rmRemotePinCmd = &cmds.Command{
@@ -403,10 +407,7 @@ To list and then remove all pending pin requests, pass an explicit status list:
rmIDs := []string{}
if len(req.Arguments) == 0 {
- psCh, errCh, err := lsRemote(ctx, req, c)
- if err != nil {
- return err
- }
+ psCh, errCh := lsRemote(ctx, req, c)
for ps := range psCh {
rmIDs = append(rmIDs, ps.GetRequestId())
}
diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go
index 64c65b651c2..c343f4e25c9 100644
--- a/core/coreapi/pin.go
+++ b/core/coreapi/pin.go
@@ -51,13 +51,18 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
return api.pinning.Flush(ctx)
}
-func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, error) {
+func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, <-chan error) {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls")
defer span.End()
settings, err := caopts.PinLsOptions(opts...)
if err != nil {
- return nil, err
+ outCh := make(chan coreiface.Pin)
+ errCh := make(chan error, 1)
+ errCh <- err
+ close(outCh)
+ close(errCh)
+ return outCh, errCh
}
span.SetAttributes(attribute.String("type", settings.Type))
@@ -65,10 +70,15 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan c
switch settings.Type {
case "all", "direct", "indirect", "recursive":
default:
- return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
+ outCh := make(chan coreiface.Pin)
+ errCh := make(chan error, 1)
+ errCh <- fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
+ close(outCh)
+ close(errCh)
+ return outCh, errCh
}
- return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name), nil
+ return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name)
}
func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) {
@@ -230,6 +240,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
}
out := make(chan coreiface.PinStatus)
+
go func() {
defer close(out)
for p := range api.pinning.RecursiveKeys(ctx, false) {
@@ -254,7 +265,6 @@ type pinInfo struct {
pinType string
path path.ImmutablePath
name string
- err error
}
func (p *pinInfo) Path() path.ImmutablePath {
@@ -269,17 +279,12 @@ func (p *pinInfo) Name() string {
return p.name
}
-func (p *pinInfo) Err() error {
- return p.err
-}
-
// pinLsAll is an internal function for returning a list of pins
//
// The caller must keep reading results until the channel is closed to prevent
// leaking the goroutine that is fetching pins.
-func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string) <-chan coreiface.Pin {
+func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string) (<-chan coreiface.Pin, <-chan error) {
out := make(chan coreiface.Pin, 1)
-
emittedSet := cid.NewSet()
AddToResultKeys := func(c cid.Cid, pinName, typeStr string) error {
@@ -297,19 +302,22 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
return nil
}
+ errOut := make(chan error, 1)
+
go func() {
defer close(out)
+ defer close(errOut)
var rkeys []cid.Cid
var err error
if typeStr == "recursive" || typeStr == "all" {
for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
if streamedCid.Err != nil {
- out <- &pinInfo{err: streamedCid.Err}
+ errOut <- streamedCid.Err
return
}
if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil {
- out <- &pinInfo{err: err}
+ errOut <- err
return
}
rkeys = append(rkeys, streamedCid.Pin.Key)
@@ -318,11 +326,11 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
if typeStr == "direct" || typeStr == "all" {
for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
if streamedCid.Err != nil {
- out <- &pinInfo{err: streamedCid.Err}
+ errOut <- streamedCid.Err
return
}
if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil {
- out <- &pinInfo{err: err}
+ errOut <- err
return
}
}
@@ -333,7 +341,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
if streamedCid.Err != nil {
- out <- &pinInfo{err: streamedCid.Err}
+ errOut <- streamedCid.Err
return
}
emittedSet.Add(streamedCid.Pin.Key)
@@ -341,7 +349,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
if streamedCid.Err != nil {
- out <- &pinInfo{err: streamedCid.Err}
+ errOut <- streamedCid.Err
return
}
emittedSet.Add(streamedCid.Pin.Key)
@@ -362,7 +370,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
}
err := AddToResultKeys(c, "", "indirect")
if err != nil {
- out <- &pinInfo{err: err}
+ errOut <- err
return false
}
return true
@@ -370,14 +378,14 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
- out <- &pinInfo{err: err}
+ errOut <- err
return
}
}
}
}()
- return out
+ return out, errOut
}
func (api *PinAPI) core() coreiface.CoreAPI {
diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go
index dbeeefda426..b7134b0ffed 100644
--- a/core/coreapi/unixfs.go
+++ b/core/coreapi/unixfs.go
@@ -197,13 +197,18 @@ func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error)
// Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format:
// ` `
-func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, error) {
+func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, <-chan error) {
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
settings, err := options.UnixfsLsOptions(opts...)
if err != nil {
- return nil, err
+ errOut := make(chan error, 1)
+ errOut <- err
+ close(errOut)
+ out := make(chan coreiface.DirEntry)
+ close(out)
+ return out, errOut
}
span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren))
@@ -213,21 +218,31 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.Unixf
dagnode, err := ses.ResolveNode(ctx, p)
if err != nil {
- return nil, err
+ errOut := make(chan error, 1)
+ errOut <- err
+ close(errOut)
+ out := make(chan coreiface.DirEntry)
+ close(out)
+ return out, errOut
}
dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode)
- if err == uio.ErrNotADir {
- return uses.lsFromLinks(ctx, dagnode.Links(), settings)
- }
if err != nil {
- return nil, err
+ if err == uio.ErrNotADir {
+ return uses.lsFromLinks(ctx, dagnode.Links(), settings)
+ }
+ errOut := make(chan error, 1)
+ errOut <- err
+ close(errOut)
+ out := make(chan coreiface.DirEntry)
+ close(out)
+ return out, errOut
}
return uses.lsFromLinksAsync(ctx, dir, settings)
}
-func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) coreiface.DirEntry {
+func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) (coreiface.DirEntry, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "ProcessLink")
defer span.End()
if linkres.Link != nil {
@@ -235,7 +250,7 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se
}
if linkres.Err != nil {
- return coreiface.DirEntry{Err: linkres.Err}
+ return coreiface.DirEntry{}, linkres.Err
}
lnk := coreiface.DirEntry{
@@ -252,15 +267,13 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se
if settings.ResolveChildren {
linkNode, err := linkres.Link.GetNode(ctx, api.dag)
if err != nil {
- lnk.Err = err
- break
+ return coreiface.DirEntry{}, err
}
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := ft.FSNodeFromBytes(pn.Data())
if err != nil {
- lnk.Err = err
- break
+ return coreiface.DirEntry{}, err
}
switch d.Type() {
case ft.TFile, ft.TRaw:
@@ -284,35 +297,58 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se
}
}
- return lnk
+ return lnk, nil
}
-func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, error) {
+func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) {
out := make(chan coreiface.DirEntry, uio.DefaultShardWidth)
+ errOut := make(chan error, 1)
go func() {
defer close(out)
+ defer close(errOut)
+
for l := range dir.EnumLinksAsync(ctx) {
+ dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel
+ if err != nil {
+ errOut <- err
+ return
+ }
select {
- case out <- api.processLink(ctx, l, settings): // TODO: perf: processing can be done in background and in parallel
+ case out <- dirEnt:
case <-ctx.Done():
return
}
}
}()
- return out, nil
+ return out, errOut
}
-func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, error) {
- links := make(chan coreiface.DirEntry, len(ndlinks))
- for _, l := range ndlinks {
- lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
+func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) {
+ out := make(chan coreiface.DirEntry, uio.DefaultShardWidth)
+ errOut := make(chan error, 1)
- links <- api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async
- }
- close(links)
- return links, nil
+ go func() {
+ defer close(out)
+ defer close(errOut)
+
+ for _, l := range ndlinks {
+ lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
+ dirEnt, err := api.processLink(ctx, lr, settings) // TODO: perf: processing can be done in background and in parallel
+ if err != nil {
+ errOut <- err
+ return
+ }
+ select {
+ case out <- dirEnt:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return out, errOut
}
func (api *UnixfsAPI) core() *CoreAPI {
diff --git a/core/coreiface/pin.go b/core/coreiface/pin.go
index ed837fc9ce2..39cfc5941da 100644
--- a/core/coreiface/pin.go
+++ b/core/coreiface/pin.go
@@ -18,9 +18,6 @@ type Pin interface {
// Type of the pin
Type() string
-
- // if not nil, an error happened. Everything else should be ignored.
- Err() error
}
// PinStatus holds information about pin health
@@ -51,7 +48,7 @@ type PinAPI interface {
Add(context.Context, path.Path, ...options.PinAddOption) error
// Ls returns list of pinned objects on this node
- Ls(context.Context, ...options.PinLsOption) (<-chan Pin, error)
+ Ls(context.Context, ...options.PinLsOption) (<-chan Pin, <-chan error)
// IsPinned returns whether or not the given cid is pinned
// and an explanation of why its pinned
diff --git a/core/coreiface/tests/block.go b/core/coreiface/tests/block.go
index 3b4ca0bc05d..f59bf1fa871 100644
--- a/core/coreiface/tests/block.go
+++ b/core/coreiface/tests/block.go
@@ -323,9 +323,14 @@ func (tp *TestSuite) TestBlockPin(t *testing.T) {
t.Fatal(err)
}
- if pins, err := api.Pin().Ls(ctx); err != nil || len(pins) != 0 {
+ pinsCh, errCh := api.Pin().Ls(ctx)
+ _, ok := <-pinsCh
+ if ok {
t.Fatal("expected 0 pins")
}
+ if err = <-errCh; err != nil {
+ t.Fatal(err)
+ }
res, err := api.Block().Put(
ctx,
diff --git a/core/coreiface/tests/pin.go b/core/coreiface/tests/pin.go
index fdd7c15ccbf..a134854931c 100644
--- a/core/coreiface/tests/pin.go
+++ b/core/coreiface/tests/pin.go
@@ -593,19 +593,15 @@ func assertNotPinned(t *testing.T, ctx context.Context, api iface.CoreAPI, p pat
}
}
-func accPins(pins <-chan iface.Pin, err error) ([]iface.Pin, error) {
- if err != nil {
- return nil, err
- }
-
- var result []iface.Pin
+func accPins(pins <-chan iface.Pin, errs <-chan error) ([]iface.Pin, error) {
+ var results []iface.Pin
for pin := range pins {
- if pin.Err() != nil {
- return nil, pin.Err()
- }
- result = append(result, pin)
+ results = append(results, pin)
+ }
+ if err := <-errs; err != nil {
+ return nil, err
}
- return result, nil
+ return results, nil
}
diff --git a/core/coreiface/tests/unixfs.go b/core/coreiface/tests/unixfs.go
index 9d3362b9aff..1ff4f6f2c5a 100644
--- a/core/coreiface/tests/unixfs.go
+++ b/core/coreiface/tests/unixfs.go
@@ -681,14 +681,11 @@ func (tp *TestSuite) TestLs(t *testing.T) {
t.Fatal(err)
}
- entries, err := api.Unixfs().Ls(ctx, p)
- if err != nil {
- t.Fatal(err)
- }
+ entries, errCh := api.Unixfs().Ls(ctx, p)
- entry := <-entries
- if entry.Err != nil {
- t.Fatal(entry.Err)
+ entry, ok := <-entries
+ if !ok {
+ t.Fatal("expected another entry")
}
if entry.Size != 15 {
t.Errorf("expected size = 15, got %d", entry.Size)
@@ -702,9 +699,9 @@ func (tp *TestSuite) TestLs(t *testing.T) {
if entry.Cid.String() != "QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr" {
t.Errorf("expected cid = QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr, got %s", entry.Cid)
}
- entry = <-entries
- if entry.Err != nil {
- t.Fatal(entry.Err)
+ entry, ok = <-entries
+ if !ok {
+ t.Fatal("expected another entry")
}
if entry.Type != coreiface.TSymlink {
t.Errorf("wrong type %s", entry.Type)
@@ -716,11 +713,12 @@ func (tp *TestSuite) TestLs(t *testing.T) {
t.Errorf("expected symlink target to be /foo/bar, got %s", entry.Target)
}
- if l, ok := <-entries; ok {
- t.Errorf("didn't expect a second link")
- if l.Err != nil {
- t.Error(l.Err)
- }
+ _, ok = <-entries
+ if ok {
+ t.Errorf("didn't expect a another link")
+ }
+ if err = <-errCh; err != nil {
+ t.Error(err)
}
}
@@ -779,13 +777,17 @@ func (tp *TestSuite) TestLsEmptyDir(t *testing.T) {
t.Fatal(err)
}
- links, err := api.Unixfs().Ls(ctx, p)
- if err != nil {
+ links, errCh := api.Unixfs().Ls(ctx, p)
+ var count int
+ for range links {
+ count++
+ }
+ if err = <-errCh; err != nil {
t.Fatal(err)
}
- if len(links) != 0 {
- t.Fatalf("expected 0 links, got %d", len(links))
+ if count != 0 {
+ t.Fatalf("expected 0 links, got %d", count)
}
}
@@ -808,13 +810,17 @@ func (tp *TestSuite) TestLsNonUnixfs(t *testing.T) {
t.Fatal(err)
}
- links, err := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()))
- if err != nil {
+ links, errCh := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()))
+ var count int
+ for range links {
+ count++
+ }
+ if err = <-errCh; err != nil {
t.Fatal(err)
}
- if len(links) != 0 {
- t.Fatalf("expected 0 links, got %d", len(links))
+ if count != 0 {
+ t.Fatalf("expected 0 links, got %d", count)
}
}
diff --git a/core/coreiface/unixfs.go b/core/coreiface/unixfs.go
index c0150bd12c6..32bedd93ea2 100644
--- a/core/coreiface/unixfs.go
+++ b/core/coreiface/unixfs.go
@@ -63,8 +63,6 @@ type DirEntry struct {
Mode os.FileMode
ModTime time.Time
-
- Err error
}
// UnixfsAPI is the basic interface to immutable files in IPFS
@@ -81,7 +79,50 @@ type UnixfsAPI interface {
// to operations performed on the returned file
Get(context.Context, path.Path) (files.Node, error)
- // Ls returns the list of links in a directory. Links aren't guaranteed to be
- // returned in order
- Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, error)
+ // Ls returns the list of links in a directory. Links aren't guaranteed to
+ // be returned in order. If an error occurs, the DirEntry channel is closed
+ // and an error is output on the error channel. Both channels are closed if
+ // the context is canceled.
+ //
+ // Example:
+ //
+ // dirs, errs := Ls(ctx, p)
+ // for dirEnt := range dirs {
+ // fmt.Println("Dir name:", dirEnt.Name)
+ // }
+ // err := <-errs
+ // if err != nil {
+ // return fmt.Errorf("error listing directory: %w", err)
+ // }
+ Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, <-chan error)
+}
+
+/* TODO: Uncomment after go1.23 required.
+// LsIter returns a go iterator that allows ranging over DirEntry results.
+// Iteration stops if the context is canceled or if the iterator yields an
+// error.
+//
+// Exmaple:
+//
+// for dirEnt, err := LsIter(ctx, ufsAPI, p) {
+// if err != nil {
+// return fmt.Errorf("error listing directory: %w", err)
+// }
+// fmt.Println("Dir name:", dirEnt.Name)
+// }
+func LsIter(ctx context.Context, api UnixfsAPI, p path.Path, opts ...options.UnixfsLsOption) iter.Seq2[DirEntry, error] {
+ return func(yield func(DirEntry, error) bool) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel() // cancel Ls if done iterating early
+ results, asyncErr := api.Ls(ctx, p, opts...)
+ for result := range results {
+ if !yield(result, nil) {
+ return
+ }
+ }
+ if err != <-asyncErr; err != nil {
+ yield(nil, err)
+ }
+ }
}
+*/
From 09815c11f1381bacd7997e3b624a06cc920ce998 Mon Sep 17 00:00:00 2001
From: gammazero <11790789+gammazero@users.noreply.github.com>
Date: Wed, 16 Oct 2024 05:03:38 -1000
Subject: [PATCH 2/6] Add LsIter to iterate items from UnixfsAPI interfaces
---
core/coreiface/unixfs.go | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/core/coreiface/unixfs.go b/core/coreiface/unixfs.go
index 32bedd93ea2..6bdcdfa5045 100644
--- a/core/coreiface/unixfs.go
+++ b/core/coreiface/unixfs.go
@@ -2,6 +2,7 @@ package iface
import (
"context"
+ "iter"
"os"
"time"
@@ -97,7 +98,6 @@ type UnixfsAPI interface {
Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, <-chan error)
}
-/* TODO: Uncomment after go1.23 required.
// LsIter returns a go iterator that allows ranging over DirEntry results.
// Iteration stops if the context is canceled or if the iterator yields an
// error.
@@ -120,9 +120,8 @@ func LsIter(ctx context.Context, api UnixfsAPI, p path.Path, opts ...options.Uni
return
}
}
- if err != <-asyncErr; err != nil {
- yield(nil, err)
+ if err := <-asyncErr; err != nil {
+ yield(DirEntry{}, err)
}
}
}
-*/
From 58f7a2520bd2ab453edf1aa5da00d58e4dc7ae62 Mon Sep 17 00:00:00 2001
From: gammazero <11790789+gammazero@users.noreply.github.com>
Date: Thu, 7 Nov 2024 07:57:37 -1000
Subject: [PATCH 3/6] wip
---
client/rpc/pin.go | 63 +++++--------
client/rpc/unixfs.go | 113 ++++++++++-------------
core/commands/ls.go | 17 +++-
core/commands/pin/pin.go | 12 ++-
core/commands/pin/remotepin.go | 20 ++--
core/coreapi/pin.go | 162 +++++++++++++++------------------
core/coreiface/pin.go | 5 +-
core/coreiface/tests/block.go | 13 ++-
core/coreiface/tests/pin.go | 34 ++++---
core/coreiface/tests/unixfs.go | 2 +-
10 files changed, 205 insertions(+), 236 deletions(-)
diff --git a/client/rpc/pin.go b/client/rpc/pin.go
index eb2aa316cf1..c63ca515ff9 100644
--- a/client/rpc/pin.go
+++ b/client/rpc/pin.go
@@ -62,16 +62,12 @@ type pinLsObject struct {
Type string
}
-func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, <-chan error) {
- pins := make(chan iface.Pin)
- errOut := make(chan error, 1)
+func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts.PinLsOption) error {
+ defer close(pins)
options, err := caopts.PinLsOptions(opts...)
if err != nil {
- errOut <- err
- close(pins)
- close(errOut)
- return pins, errOut
+ return err
}
res, err := api.core().Request("pin/ls").
@@ -79,42 +75,33 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan i
Option("stream", true).
Send(ctx)
if err != nil {
- errOut <- err
- close(pins)
- close(errOut)
- return pins, errOut
+ return err
}
-
- go func(ch chan<- iface.Pin, errCh chan<- error) {
- defer res.Output.Close()
- defer close(ch)
- defer close(errCh)
-
- dec := json.NewDecoder(res.Output)
- var out pinLsObject
- for {
- err := dec.Decode(&out)
- if err != nil {
- if err != io.EOF {
- errCh <- err
- }
- return
+ defer res.Output.Close()
+
+ dec := json.NewDecoder(res.Output)
+ var out pinLsObject
+ for {
+ err := dec.Decode(&out)
+ if err != nil {
+ if err != io.EOF {
+ return err
}
+ return nil
+ }
- c, err := cid.Parse(out.Cid)
- if err != nil {
- errCh <- err
- return
- }
+ c, err := cid.Parse(out.Cid)
+ if err != nil {
+ return err
+ }
- select {
- case ch <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
- case <-ctx.Done():
- return
- }
+ select {
+ case pins <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
+ case <-ctx.Done():
+ return ctx.Err()
}
- }(pins, errOut)
- return pins, errOut
+ }
+ return nil
}
// IsPinned returns whether or not the given cid is pinned
diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go
index c913db67a8f..70eb92a3a1d 100644
--- a/client/rpc/unixfs.go
+++ b/client/rpc/unixfs.go
@@ -144,16 +144,12 @@ type lsOutput struct {
Objects []lsObject
}
-func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, <-chan error) {
- out := make(chan iface.DirEntry)
- errOut := make(chan error, 1)
+func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error {
+ defer close(out)
options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
- errOut <- err
- close(out)
- close(errOut)
- return out, errOut
+ return err
}
resp, err := api.core().Request("ls", p.String()).
@@ -162,79 +158,66 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
Option("stream", true).
Send(ctx)
if err != nil {
- errOut <- err
- close(out)
- close(errOut)
- return out, errOut
+ return err
}
if resp.Error != nil {
- errOut <- resp.Error
- close(out)
- close(errOut)
- return out, errOut
+ return err
}
+ defer resp.Close()
dec := json.NewDecoder(resp.Output)
- go func() {
- defer resp.Close()
- defer close(out)
- defer close(errOut)
-
- for {
- var link lsOutput
- if err := dec.Decode(&link); err != nil {
- if err != io.EOF {
- errOut <- err
- }
- return
+ for {
+ var link lsOutput
+ if err = dec.Decode(&link); err != nil {
+ if err != io.EOF {
+ return err
}
+ return nil
+ }
- if len(link.Objects) != 1 {
- errOut <- errors.New("unexpected Objects len")
- return
- }
+ if len(link.Objects) != 1 {
+ return errors.New("unexpected Objects len")
+ }
- if len(link.Objects[0].Links) != 1 {
- errOut <- errors.New("unexpected Links len")
- return
- }
+ if len(link.Objects[0].Links) != 1 {
+ return errors.New("unexpected Links len")
+ }
- l0 := link.Objects[0].Links[0]
+ l0 := link.Objects[0].Links[0]
- c, err := cid.Decode(l0.Hash)
- if err != nil {
- errOut <- err
- return
- }
+ c, err := cid.Decode(l0.Hash)
+ if err != nil {
+ return err
+ }
- var ftype iface.FileType
- switch l0.Type {
- case unixfs.TRaw, unixfs.TFile:
- ftype = iface.TFile
- case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
- ftype = iface.TDirectory
- case unixfs.TSymlink:
- ftype = iface.TSymlink
- }
+ var ftype iface.FileType
+ switch l0.Type {
+ case unixfs.TRaw, unixfs.TFile:
+ ftype = iface.TFile
+ case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
+ ftype = iface.TDirectory
+ case unixfs.TSymlink:
+ ftype = iface.TSymlink
+ }
- select {
- case out <- iface.DirEntry{
- Name: l0.Name,
- Cid: c,
- Size: l0.Size,
- Type: ftype,
- Target: l0.Target,
-
- Mode: l0.Mode,
- ModTime: l0.ModTime,
- }:
- case <-ctx.Done():
- }
+ select {
+ case out <- iface.DirEntry{
+ Name: l0.Name,
+ Cid: c,
+ Size: l0.Size,
+ Type: ftype,
+ Target: l0.Target,
+
+ Mode: l0.Mode,
+ ModTime: l0.ModTime,
+ }:
+ case <-ctx.Done():
+ return ctx.Err()
}
- }()
+ }
- return out, errOut
+ return nil
}
func (api *UnixfsAPI) core() *HttpApi {
diff --git a/core/commands/ls.go b/core/commands/ls.go
index 15db7ff53d7..da1d9ebc530 100644
--- a/core/commands/ls.go
+++ b/core/commands/ls.go
@@ -133,14 +133,21 @@ The JSON output contains type information.
}
}
+ lsCtx, cancel := context.WithCancel(req.Context)
+ defer cancel()
+
for i, fpath := range paths {
pth, err := cmdutils.PathOrCidPath(fpath)
if err != nil {
return err
}
- results, errCh := api.Unixfs().Ls(req.Context, pth,
- options.Unixfs.ResolveChildren(resolveSize || resolveType))
+ results := make(chan iface.DirEntry)
+ var lsErr error
+ go func() {
+ lsErr = api.Unixfs().Ls(lsCtx, pth, results,
+ options.Unixfs.ResolveChildren(resolveSize || resolveType))
+ }()
processLink, dirDone = processDir()
for link := range results {
@@ -164,12 +171,12 @@ The JSON output contains type information.
Mode: link.Mode,
ModTime: link.ModTime,
}
- if err := processLink(paths[i], lsLink); err != nil {
+ if err = processLink(paths[i], lsLink); err != nil {
return err
}
}
- if err = <-errCh; err != nil {
- return err
+ if lsErr != nil {
+ return lsErr
}
dirDone(i)
}
diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go
index bd9c43a8326..a73e352e268 100644
--- a/core/commands/pin/pin.go
+++ b/core/commands/pin/pin.go
@@ -557,7 +557,15 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}
- pins, errCh := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
+ var lsErr error
+ pins := make(chan iface.Pin)
+ lsCtx, cancel := context.WithCancel(req.Context)
+ defer cancel()
+
+ go func() {
+ lsErr = api.Pin().Ls(lsCtx, opt, pins, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
+ }()
+
for p := range pins {
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
@@ -570,7 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}
- return <-errCh
+ return lsErr
}
const (
diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go
index 40cb515c5f3..7319bcd9644 100644
--- a/core/commands/pin/remotepin.go
+++ b/core/commands/pin/remotepin.go
@@ -313,7 +313,7 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
}
// Executes GET /pins/?query-with-filters
-func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-chan pinclient.PinStatusGetter, <-chan error) {
+func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out chan<- pinclient.PinStatusGetter) error {
opts := []pinclient.LsOption{}
if name, nameFound := req.Options[pinNameOptionName]; nameFound {
nameStr := name.(string)
@@ -326,12 +326,8 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-ch
for _, rawCID := range cidsRawArr {
parsedCID, err := cid.Decode(rawCID)
if err != nil {
- psCh := make(chan pinclient.PinStatusGetter)
- errCh := make(chan error, 1)
- errCh <- fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
- close(psCh)
- close(errCh)
- return psCh, errCh
+ close(out)
+ return fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
}
parsedCIDs = append(parsedCIDs, parsedCID)
}
@@ -343,19 +339,15 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-ch
for _, rawStatus := range statusRawArr {
s := pinclient.Status(rawStatus)
if s.String() == string(pinclient.StatusUnknown) {
- psCh := make(chan pinclient.PinStatusGetter)
- errCh := make(chan error, 1)
- errCh <- fmt.Errorf("status %q is not valid", rawStatus)
- close(psCh)
- close(errCh)
- return psCh, errCh
+ close(out)
+ return fmt.Errorf("status %q is not valid", rawStatus)
}
parsedStatuses = append(parsedStatuses, s)
}
opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...))
}
- return c.Ls(ctx, opts...)
+ return c.Ls(ctx, out, opts...)
}
var rmRemotePinCmd = &cmds.Command{
diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go
index c343f4e25c9..eb3cdf3fec6 100644
--- a/core/coreapi/pin.go
+++ b/core/coreapi/pin.go
@@ -51,18 +51,14 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
return api.pinning.Flush(ctx)
}
-func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, <-chan error) {
+func (api *PinAPI) Ls(ctx context.Context, pins chan<- coreiface.Pin, opts ...caopts.PinLsOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls")
defer span.End()
settings, err := caopts.PinLsOptions(opts...)
if err != nil {
- outCh := make(chan coreiface.Pin)
- errCh := make(chan error, 1)
- errCh <- err
- close(outCh)
- close(errCh)
- return outCh, errCh
+ close(pins)
+ return err
}
span.SetAttributes(attribute.String("type", settings.Type))
@@ -70,15 +66,11 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan c
switch settings.Type {
case "all", "direct", "indirect", "recursive":
default:
- outCh := make(chan coreiface.Pin)
- errCh := make(chan error, 1)
- errCh <- fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
- close(outCh)
- close(errCh)
- return outCh, errCh
+ close(pins)
+ return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
}
- return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name)
+ return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name, pins)
}
func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) {
@@ -283,8 +275,8 @@ func (p *pinInfo) Name() string {
//
// The caller must keep reading results until the channel is closed to prevent
// leaking the goroutine that is fetching pins.
-func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string) (<-chan coreiface.Pin, <-chan error) {
- out := make(chan coreiface.Pin, 1)
+func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string, out chan<- coreiface.Pin) error {
+ defer close(out)
emittedSet := cid.NewSet()
AddToResultKeys := func(c cid.Cid, pinName, typeStr string) error {
@@ -302,90 +294,82 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
return nil
}
- errOut := make(chan error, 1)
-
- go func() {
- defer close(out)
- defer close(errOut)
-
- var rkeys []cid.Cid
- var err error
- if typeStr == "recursive" || typeStr == "all" {
- for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
- if streamedCid.Err != nil {
- errOut <- streamedCid.Err
- return
- }
- if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil {
- errOut <- err
- return
- }
- rkeys = append(rkeys, streamedCid.Pin.Key)
+ var rkeys []cid.Cid
+ var err error
+ if typeStr == "recursive" || typeStr == "all" {
+ for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
+ if streamedCid.Err != nil {
+ return streamedCid.Err
}
+ if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil {
+ return err
+ }
+ rkeys = append(rkeys, streamedCid.Pin.Key)
}
- if typeStr == "direct" || typeStr == "all" {
- for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
- if streamedCid.Err != nil {
- errOut <- streamedCid.Err
- return
- }
- if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil {
- errOut <- err
- return
- }
+ }
+ if typeStr == "direct" || typeStr == "all" {
+ for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
+ if streamedCid.Err != nil {
+ return streamedCid.Err
+ }
+ if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil {
+ return err
}
}
- if typeStr == "indirect" {
- // We need to first visit the direct pins that have priority
- // without emitting them
-
- for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
- if streamedCid.Err != nil {
- errOut <- streamedCid.Err
- return
- }
- emittedSet.Add(streamedCid.Pin.Key)
+ }
+ if typeStr == "indirect" {
+ // We need to first visit the direct pins that have priority
+ // without emitting them
+
+ for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
+ if streamedCid.Err != nil {
+ return streamedCid.Err
}
+ emittedSet.Add(streamedCid.Pin.Key)
+ }
- for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
- if streamedCid.Err != nil {
- errOut <- streamedCid.Err
- return
- }
- emittedSet.Add(streamedCid.Pin.Key)
- rkeys = append(rkeys, streamedCid.Pin.Key)
+ for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
+ if streamedCid.Err != nil {
+ return streamedCid.Err
}
+ emittedSet.Add(streamedCid.Pin.Key)
+ rkeys = append(rkeys, streamedCid.Pin.Key)
+ }
+ }
+ if typeStr == "indirect" || typeStr == "all" {
+ if len(rkeys) == 0 {
+ return nil
}
- if typeStr == "indirect" || typeStr == "all" {
- walkingSet := cid.NewSet()
- for _, k := range rkeys {
- err = merkledag.Walk(
- ctx, merkledag.GetLinksWithDAG(api.dag), k,
- func(c cid.Cid) bool {
- if !walkingSet.Visit(c) {
- return false
- }
- if emittedSet.Has(c) {
- return true // skipped
- }
- err := AddToResultKeys(c, "", "indirect")
- if err != nil {
- errOut <- err
- return false
- }
- return true
- },
- merkledag.SkipRoot(), merkledag.Concurrent(),
- )
- if err != nil {
- errOut <- err
- return
- }
+ var addErr error
+ walkingSet := cid.NewSet()
+ for _, k := range rkeys {
+ err = merkledag.Walk(
+ ctx, merkledag.GetLinksWithDAG(api.dag), k,
+ func(c cid.Cid) bool {
+ if !walkingSet.Visit(c) {
+ return false
+ }
+ if emittedSet.Has(c) {
+ return true // skipped
+ }
+ addErr := AddToResultKeys(c, "", "indirect")
+ if addErr != nil {
+ return false
+ }
+ return true
+ },
+ merkledag.SkipRoot(), merkledag.Concurrent(),
+ )
+ if err != nil {
+ return err
+ }
+ if addErr != nil {
+ return addErr
}
}
- }()
+ }
- return out, errOut
+ return nil
}
func (api *PinAPI) core() coreiface.CoreAPI {
diff --git a/core/coreiface/pin.go b/core/coreiface/pin.go
index 39cfc5941da..e0fd2fb90ed 100644
--- a/core/coreiface/pin.go
+++ b/core/coreiface/pin.go
@@ -47,8 +47,9 @@ type PinAPI interface {
// tree
Add(context.Context, path.Path, ...options.PinAddOption) error
- // Ls returns list of pinned objects on this node
- Ls(context.Context, ...options.PinLsOption) (<-chan Pin, <-chan error)
+ // Ls returns this node's pinned objects on the provided channel. The
+ // channel is closed when there are no more pins and an error is returned.
+ Ls(context.Context, chan<- Pin, ...options.PinLsOption) error
// IsPinned returns whether or not the given cid is pinned
// and an explanation of why its pinned
diff --git a/core/coreiface/tests/block.go b/core/coreiface/tests/block.go
index f59bf1fa871..2b5a68a63b8 100644
--- a/core/coreiface/tests/block.go
+++ b/core/coreiface/tests/block.go
@@ -323,12 +323,15 @@ func (tp *TestSuite) TestBlockPin(t *testing.T) {
t.Fatal(err)
}
- pinsCh, errCh := api.Pin().Ls(ctx)
- _, ok := <-pinsCh
- if ok {
+ pinCh := make(chan coreiface.Pin)
+ go func() {
+ err = api.Pin().Ls(ctx, pinCh)
+ }()
+
+ for range pinCh {
t.Fatal("expected 0 pins")
}
- if err = <-errCh; err != nil {
+ if err != nil {
t.Fatal(err)
}
@@ -342,7 +345,7 @@ func (tp *TestSuite) TestBlockPin(t *testing.T) {
t.Fatal(err)
}
- pins, err := accPins(api.Pin().Ls(ctx))
+ pins, err := accPins(ctx, api)
if err != nil {
t.Fatal(err)
}
diff --git a/core/coreiface/tests/pin.go b/core/coreiface/tests/pin.go
index a134854931c..4c606323fec 100644
--- a/core/coreiface/tests/pin.go
+++ b/core/coreiface/tests/pin.go
@@ -67,7 +67,7 @@ func (tp *TestSuite) TestPinSimple(t *testing.T) {
t.Fatal(err)
}
- list, err := accPins(api.Pin().Ls(ctx))
+ list, err := accPins(ctx, api)
if err != nil {
t.Fatal(err)
}
@@ -91,7 +91,7 @@ func (tp *TestSuite) TestPinSimple(t *testing.T) {
t.Fatal(err)
}
- list, err = accPins(api.Pin().Ls(ctx))
+ list, err = accPins(ctx, api)
if err != nil {
t.Fatal(err)
}
@@ -143,7 +143,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
t.Fatal(err)
}
- list, err := accPins(api.Pin().Ls(ctx))
+ list, err := accPins(ctx, api)
if err != nil {
t.Fatal(err)
}
@@ -152,7 +152,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
t.Errorf("unexpected pin list len: %d", len(list))
}
- list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Direct()))
+ list, err = accPins(ctx, api, opt.Pin.Ls.Direct())
if err != nil {
t.Fatal(err)
}
@@ -165,7 +165,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
t.Errorf("unexpected path, %s != %s", list[0].Path().String(), path.FromCid(nd3.Cid()).String())
}
- list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Recursive()))
+ list, err = accPins(ctx, api, opt.Pin.Ls.Recursive())
if err != nil {
t.Fatal(err)
}
@@ -178,7 +178,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
t.Errorf("unexpected path, %s != %s", list[0].Path().String(), path.FromCid(nd2.Cid()).String())
}
- list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Indirect()))
+ list, err = accPins(ctx, api, opt.Pin.Ls.Indirect())
if err != nil {
t.Fatal(err)
}
@@ -436,21 +436,21 @@ func getThreeChainedNodes(t *testing.T, ctx context.Context, api iface.CoreAPI,
func assertPinTypes(t *testing.T, ctx context.Context, api iface.CoreAPI, recusive, direct, indirect []cidContainer) {
assertPinLsAllConsistency(t, ctx, api)
- list, err := accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Recursive()))
+ list, err := accPins(ctx, api, opt.Pin.Ls.Recursive())
if err != nil {
t.Fatal(err)
}
assertPinCids(t, list, recusive...)
- list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Direct()))
+ list, err = accPins(ctx, api, opt.Pin.Ls.Direct())
if err != nil {
t.Fatal(err)
}
assertPinCids(t, list, direct...)
- list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Indirect()))
+ list, err = accPins(ctx, api, opt.Pin.Ls.Indirect())
if err != nil {
t.Fatal(err)
}
@@ -500,7 +500,7 @@ func assertPinCids(t *testing.T, pins []iface.Pin, cids ...cidContainer) {
// assertPinLsAllConsistency verifies that listing all pins gives the same result as listing the pin types individually
func assertPinLsAllConsistency(t *testing.T, ctx context.Context, api iface.CoreAPI) {
t.Helper()
- allPins, err := accPins(api.Pin().Ls(ctx))
+ allPins, err := accPins(ctx, api)
if err != nil {
t.Fatal(err)
}
@@ -531,7 +531,7 @@ func assertPinLsAllConsistency(t *testing.T, ctx context.Context, api iface.Core
}
for typeStr, pinProps := range typeMap {
- pins, err := accPins(api.Pin().Ls(ctx, pinProps.PinLsOption))
+ pins, err := accPins(ctx, api, pinProps.PinLsOption)
if err != nil {
t.Fatal(err)
}
@@ -593,15 +593,19 @@ func assertNotPinned(t *testing.T, ctx context.Context, api iface.CoreAPI, p pat
}
}
-func accPins(pins <-chan iface.Pin, errs <-chan error) ([]iface.Pin, error) {
- var results []iface.Pin
+func accPins(ctx context.Context, api iface.CoreAPI, opts ...opt.PinLsOption) ([]iface.Pin, error) {
+ var err error
+ pins := make(chan iface.Pin)
+ go func() {
+ err = api.Pin().Ls(ctx, pins, opts...)
+ }()
+ var results []iface.Pin
for pin := range pins {
results = append(results, pin)
}
- if err := <-errs; err != nil {
+ if err != nil {
return nil, err
}
-
return results, nil
}
diff --git a/core/coreiface/tests/unixfs.go b/core/coreiface/tests/unixfs.go
index 1ff4f6f2c5a..e806ae9baef 100644
--- a/core/coreiface/tests/unixfs.go
+++ b/core/coreiface/tests/unixfs.go
@@ -544,7 +544,7 @@ func (tp *TestSuite) TestAddPinned(t *testing.T) {
t.Fatal(err)
}
- pins, err := accPins(api.Pin().Ls(ctx))
+ pins, err := accPins(ctx, api)
if err != nil {
t.Fatal(err)
}
From 10fee6d54910a8d690c5ec896eb9538f6751b654 Mon Sep 17 00:00:00 2001
From: gammazero <11790789+gammazero@users.noreply.github.com>
Date: Mon, 25 Nov 2024 12:36:56 -1000
Subject: [PATCH 4/6] APIs take chan argument and return error
---
client/rpc/pin.go | 2 +-
client/rpc/unixfs.go | 2 +-
core/commands/ls.go | 9 ++--
core/commands/pin/pin.go | 8 +--
core/commands/pin/remotepin.go | 53 ++++++++++++--------
core/coreapi/unixfs.go | 90 +++++++++++++---------------------
core/coreiface/tests/unixfs.go | 20 ++++++--
core/coreiface/unixfs.go | 30 +++++++-----
8 files changed, 115 insertions(+), 99 deletions(-)
diff --git a/client/rpc/pin.go b/client/rpc/pin.go
index c63ca515ff9..bceb5498cea 100644
--- a/client/rpc/pin.go
+++ b/client/rpc/pin.go
@@ -87,7 +87,7 @@ func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts
if err != io.EOF {
return err
}
- return nil
+ break
}
c, err := cid.Parse(out.Cid)
diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go
index 70eb92a3a1d..faa6086c240 100644
--- a/client/rpc/unixfs.go
+++ b/client/rpc/unixfs.go
@@ -173,7 +173,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirE
if err != io.EOF {
return err
}
- return nil
+ break
}
if len(link.Objects) != 1 {
diff --git a/core/commands/ls.go b/core/commands/ls.go
index da1d9ebc530..bdd475d96cb 100644
--- a/core/commands/ls.go
+++ b/core/commands/ls.go
@@ -1,6 +1,7 @@
package commands
import (
+ "context"
"fmt"
"io"
"os"
@@ -143,9 +144,9 @@ The JSON output contains type information.
}
results := make(chan iface.DirEntry)
- var lsErr error
+ lsErr := make(chan error, 1)
go func() {
- lsErr = api.Unixfs().Ls(lsCtx, pth, results,
+ lsErr <- api.Unixfs().Ls(lsCtx, pth, results,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
}()
@@ -175,8 +176,8 @@ The JSON output contains type information.
return err
}
}
- if lsErr != nil {
- return lsErr
+ if err = <-lsErr; err != nil {
+ return err
}
dirDone(i)
}
diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go
index a73e352e268..428a75b695d 100644
--- a/core/commands/pin/pin.go
+++ b/core/commands/pin/pin.go
@@ -557,13 +557,13 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}
- var lsErr error
- pins := make(chan iface.Pin)
+ pins := make(chan coreiface.Pin)
+ lsErr := make(chan error, 1)
lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()
go func() {
- lsErr = api.Pin().Ls(lsCtx, opt, pins, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
+ lsErr <- api.Pin().Ls(lsCtx, pins, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
}()
for p := range pins {
@@ -578,7 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}
- return lsErr
+ return <-lsErr
}
const (
diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go
index 7319bcd9644..fc852fbbfc6 100644
--- a/core/commands/pin/remotepin.go
+++ b/core/commands/pin/remotepin.go
@@ -293,14 +293,18 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
ctx, cancel := context.WithCancel(req.Context)
defer cancel()
- psCh, errCh := lsRemote(ctx, req, c)
+ psCh := make(chan pinclient.PinStatusGetter)
+ lsErr := make(chan error, 1)
+ go func() {
+ lsErr <- lsRemote(ctx, req, c, psCh)
+ }()
for ps := range psCh {
if err := res.Emit(toRemotePinOutput(ps)); err != nil {
return err
}
}
- return <-errCh
+ return <-lsErr
},
Type: RemotePinOutput{},
Encoders: cmds.EncoderMap{
@@ -347,7 +351,12 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out c
opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...))
}
- return c.Ls(ctx, out, opts...)
+ rmtOut, rmtErr := c.Ls(ctx, opts...)
+ for p := range rmtOut {
+ out <- p
+ }
+ return <-rmtErr
+ //return c.Ls2(ctx, out, opts...)
}
var rmRemotePinCmd = &cmds.Command{
@@ -389,33 +398,37 @@ To list and then remove all pending pin requests, pass an explicit status list:
cmds.BoolOption(pinForceOptionName, "Allow removal of multiple pins matching the query without additional confirmation.").WithDefault(false),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
- ctx, cancel := context.WithCancel(req.Context)
- defer cancel()
-
c, err := getRemotePinServiceFromRequest(req, env)
if err != nil {
return err
}
rmIDs := []string{}
- if len(req.Arguments) == 0 {
- psCh, errCh := lsRemote(ctx, req, c)
- for ps := range psCh {
- rmIDs = append(rmIDs, ps.GetRequestId())
- }
- if err = <-errCh; err != nil {
- return fmt.Errorf("error while listing remote pins: %v", err)
- }
-
- if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) {
- return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal")
- }
- } else {
+ if len(req.Arguments) != 0 {
return fmt.Errorf("unexpected argument %q", req.Arguments[0])
}
+ psCh := make(chan pinclient.PinStatusGetter)
+ errCh := make(chan error, 1)
+ ctx, cancel := context.WithCancel(req.Context)
+ defer cancel()
+
+ go func() {
+ errCh <- lsRemote(ctx, req, c, psCh)
+ }()
+ for ps := range psCh {
+ rmIDs = append(rmIDs, ps.GetRequestId())
+ }
+ if err = <-errCh; err != nil {
+ return fmt.Errorf("error while listing remote pins: %v", err)
+ }
+
+ if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) {
+ return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal")
+ }
+
for _, rmID := range rmIDs {
- if err := c.DeleteByID(ctx, rmID); err != nil {
+ if err = c.DeleteByID(ctx, rmID); err != nil {
return fmt.Errorf("removing pin identified by requestid=%q failed: %v", rmID, err)
}
}
diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go
index b7134b0ffed..9d91f09b6ef 100644
--- a/core/coreapi/unixfs.go
+++ b/core/coreapi/unixfs.go
@@ -2,6 +2,7 @@ package coreapi
import (
"context"
+ "errors"
"fmt"
blockservice "github.com/ipfs/boxo/blockservice"
@@ -197,18 +198,15 @@ func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error)
// Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format:
// ` `
-func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, <-chan error) {
+func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- coreiface.DirEntry, opts ...options.UnixfsLsOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
+ defer close(out)
+
settings, err := options.UnixfsLsOptions(opts...)
if err != nil {
- errOut := make(chan error, 1)
- errOut <- err
- close(errOut)
- out := make(chan coreiface.DirEntry)
- close(out)
- return out, errOut
+ return err
}
span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren))
@@ -218,28 +216,18 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.Unixf
dagnode, err := ses.ResolveNode(ctx, p)
if err != nil {
- errOut := make(chan error, 1)
- errOut <- err
- close(errOut)
- out := make(chan coreiface.DirEntry)
- close(out)
- return out, errOut
+ return err
}
dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode)
if err != nil {
- if err == uio.ErrNotADir {
- return uses.lsFromLinks(ctx, dagnode.Links(), settings)
+ if errors.Is(err, uio.ErrNotADir) {
+ return uses.lsFromLinks(ctx, dagnode.Links(), settings, out)
}
- errOut := make(chan error, 1)
- errOut <- err
- close(errOut)
- out := make(chan coreiface.DirEntry)
- close(out)
- return out, errOut
+ return err
}
- return uses.lsFromLinksAsync(ctx, dir, settings)
+ return uses.lsFromDirLinks(ctx, dir, settings, out)
}
func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) (coreiface.DirEntry, error) {
@@ -300,55 +288,47 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se
return lnk, nil
}
-func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) {
- out := make(chan coreiface.DirEntry, uio.DefaultShardWidth)
- errOut := make(chan error, 1)
-
- go func() {
- defer close(out)
- defer close(errOut)
-
- for l := range dir.EnumLinksAsync(ctx) {
- dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel
- if err != nil {
- errOut <- err
- return
- }
- select {
- case out <- dirEnt:
- case <-ctx.Done():
- return
- }
+func (api *UnixfsAPI) lsFromDirLinks(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error {
+ for l := range dir.EnumLinksAsync(ctx) {
+ dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel
+ if err != nil {
+ return err
}
- }()
-
- return out, errOut
+ select {
+ case out <- dirEnt:
+ case <-ctx.Done():
+ return nil
+ }
+ }
+ return nil
}
-func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) {
- out := make(chan coreiface.DirEntry, uio.DefaultShardWidth)
- errOut := make(chan error, 1)
-
+func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error {
+ // Create links channel large enough to not block when writing to out is slower.
+ links := make(chan coreiface.DirEntry, len(ndlinks))
+ errs := make(chan error, 1)
go func() {
- defer close(out)
- defer close(errOut)
-
+ defer close(links)
+ defer close(errs)
for _, l := range ndlinks {
lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
- dirEnt, err := api.processLink(ctx, lr, settings) // TODO: perf: processing can be done in background and in parallel
+ lnk, err := api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async
if err != nil {
- errOut <- err
+ errs <- err
return
}
select {
- case out <- dirEnt:
+ case links <- lnk:
case <-ctx.Done():
return
}
}
}()
- return out, errOut
+ for lnk := range links {
+ out <- lnk
+ }
+ return <-errs
}
func (api *UnixfsAPI) core() *CoreAPI {
diff --git a/core/coreiface/tests/unixfs.go b/core/coreiface/tests/unixfs.go
index e806ae9baef..987d39b2620 100644
--- a/core/coreiface/tests/unixfs.go
+++ b/core/coreiface/tests/unixfs.go
@@ -681,7 +681,11 @@ func (tp *TestSuite) TestLs(t *testing.T) {
t.Fatal(err)
}
- entries, errCh := api.Unixfs().Ls(ctx, p)
+ errCh := make(chan error, 1)
+ entries := make(chan coreiface.DirEntry)
+ go func() {
+ errCh <- api.Unixfs().Ls(ctx, p, entries)
+ }()
entry, ok := <-entries
if !ok {
@@ -777,7 +781,12 @@ func (tp *TestSuite) TestLsEmptyDir(t *testing.T) {
t.Fatal(err)
}
- links, errCh := api.Unixfs().Ls(ctx, p)
+ errCh := make(chan error, 1)
+ links := make(chan coreiface.DirEntry)
+ go func() {
+ errCh <- api.Unixfs().Ls(ctx, p, links)
+ }()
+
var count int
for range links {
count++
@@ -810,7 +819,12 @@ func (tp *TestSuite) TestLsNonUnixfs(t *testing.T) {
t.Fatal(err)
}
- links, errCh := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()))
+ errCh := make(chan error, 1)
+ links := make(chan coreiface.DirEntry)
+ go func() {
+ errCh <- api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()), links)
+ }()
+
var count int
for range links {
count++
diff --git a/core/coreiface/unixfs.go b/core/coreiface/unixfs.go
index 6bdcdfa5045..10371998c20 100644
--- a/core/coreiface/unixfs.go
+++ b/core/coreiface/unixfs.go
@@ -80,22 +80,25 @@ type UnixfsAPI interface {
// to operations performed on the returned file
Get(context.Context, path.Path) (files.Node, error)
- // Ls returns the list of links in a directory. Links aren't guaranteed to
- // be returned in order. If an error occurs, the DirEntry channel is closed
- // and an error is output on the error channel. Both channels are closed if
- // the context is canceled.
+ // Ls writes the links in a directory to the DirEntry channel. Links aren't
+ // guaranteed to be returned in order. If an error occurs or the context is
+ // canceled, the DirEntry channel is closed and an error is returned.
//
// Example:
//
- // dirs, errs := Ls(ctx, p)
+ // dirs := make(chan DirEntry)
+ // lsErr := make(chan error, 1)
+ // go func() {
+ // lsErr <- Ls(ctx, p, dirs)
+ // }()
// for dirEnt := range dirs {
// fmt.Println("Dir name:", dirEnt.Name)
// }
- // err := <-errs
+ // err := <-lsErr
// if err != nil {
// return fmt.Errorf("error listing directory: %w", err)
// }
- Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, <-chan error)
+ Ls(context.Context, path.Path, chan<- DirEntry, ...options.UnixfsLsOption) error
}
// LsIter returns a go iterator that allows ranging over DirEntry results.
@@ -114,13 +117,18 @@ func LsIter(ctx context.Context, api UnixfsAPI, p path.Path, opts ...options.Uni
return func(yield func(DirEntry, error) bool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel Ls if done iterating early
- results, asyncErr := api.Ls(ctx, p, opts...)
- for result := range results {
- if !yield(result, nil) {
+
+ dirs := make(chan DirEntry)
+ lsErr := make(chan error, 1)
+ go func() {
+ lsErr <- api.Ls(ctx, p, dirs, opts...)
+ }()
+ for dirEnt := range dirs {
+ if !yield(dirEnt, nil) {
return
}
}
- if err := <-asyncErr; err != nil {
+ if err := <-lsErr; err != nil {
yield(DirEntry{}, err)
}
}
From 8ac3d5b8715809ab08e5389ee5d3785e234e28d1 Mon Sep 17 00:00:00 2001
From: gammazero <11790789+gammazero@users.noreply.github.com>
Date: Mon, 25 Nov 2024 13:26:36 -1000
Subject: [PATCH 5/6] fix sharowed variable error
---
core/coreapi/pin.go | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go
index eb3cdf3fec6..878b4c28d0a 100644
--- a/core/coreapi/pin.go
+++ b/core/coreapi/pin.go
@@ -352,11 +352,8 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
if emittedSet.Has(c) {
return true // skipped
}
- addErr := AddToResultKeys(c, "", "indirect")
- if addErr != nil {
- return false
- }
- return true
+ addErr = AddToResultKeys(c, "", "indirect")
+ return addErr == nil
},
merkledag.SkipRoot(), merkledag.Concurrent(),
)
From 22f399d601f14c93370fca1553943426acbd2528 Mon Sep 17 00:00:00 2001
From: gammazero <11790789+gammazero@users.noreply.github.com>
Date: Wed, 27 Nov 2024 07:37:34 -1000
Subject: [PATCH 6/6] boxo with Ls2
---
core/commands/pin/remotepin.go | 14 ++++++++------
docs/examples/kubo-as-a-library/go.mod | 4 +++-
docs/examples/kubo-as-a-library/go.sum | 8 ++++++--
go.mod | 4 +++-
go.sum | 8 ++++++--
test/dependencies/go.mod | 2 +-
test/dependencies/go.sum | 8 ++++++--
7 files changed, 33 insertions(+), 15 deletions(-)
diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go
index fc852fbbfc6..5c9bd5a954c 100644
--- a/core/commands/pin/remotepin.go
+++ b/core/commands/pin/remotepin.go
@@ -351,12 +351,14 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out c
opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...))
}
- rmtOut, rmtErr := c.Ls(ctx, opts...)
- for p := range rmtOut {
- out <- p
- }
- return <-rmtErr
- //return c.Ls2(ctx, out, opts...)
+ /*
+ rmtOut, rmtErr := c.Ls(ctx, opts...)
+ for p := range rmtOut {
+ out <- p
+ }
+ return <-rmtErr
+ */
+ return c.Ls2(ctx, out, opts...)
}
var rmRemotePinCmd = &cmds.Command{
diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod
index 3c025040ccf..ee370a16a5b 100644
--- a/docs/examples/kubo-as-a-library/go.mod
+++ b/docs/examples/kubo-as-a-library/go.mod
@@ -7,7 +7,7 @@ go 1.23
replace github.com/ipfs/kubo => ./../../..
require (
- github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1
+ github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d
github.com/ipfs/kubo v0.0.0-00010101000000-000000000000
github.com/libp2p/go-libp2p v0.37.2
github.com/multiformats/go-multiaddr v0.13.0
@@ -52,6 +52,8 @@ require (
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
+ github.com/gammazero/chanqueue v1.0.0 // indirect
+ github.com/gammazero/deque v1.0.0 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/go-jose/go-jose/v4 v4.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum
index 110c9bf8e17..4b218eb481a 100644
--- a/docs/examples/kubo-as-a-library/go.sum
+++ b/docs/examples/kubo-as-a-library/go.sum
@@ -164,6 +164,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc=
github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc=
+github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
+github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
+github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
+github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -298,8 +302,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c h1:7Uy
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c/go.mod h1:6EekK/jo+TynwSE/ZOiOJd4eEvRXoavEC3vquKtv4yI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
-github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw=
-github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4=
+github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d h1:UgBskn6bIS4Qf2GrRkDDwMRYhMaPiwHuCSW1cp0m62Y=
+github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ=
diff --git a/go.mod b/go.mod
index a9bca9d7f33..b377bae5f12 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ require (
github.com/hashicorp/go-version v1.7.0
github.com/ipfs-shipyard/nopfs v0.0.12
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c
- github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1
+ github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-cidutil v0.1.0
@@ -125,6 +125,8 @@ require (
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
+ github.com/gammazero/chanqueue v1.0.0 // indirect
+ github.com/gammazero/deque v1.0.0 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/go-jose/go-jose/v4 v4.0.4 // indirect
github.com/go-kit/log v0.2.1 // indirect
diff --git a/go.sum b/go.sum
index c29dbd816bf..4eb0aaf50f8 100644
--- a/go.sum
+++ b/go.sum
@@ -198,6 +198,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc=
github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc=
+github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
+github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
+github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
+github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -362,8 +366,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c h1:7Uy
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c/go.mod h1:6EekK/jo+TynwSE/ZOiOJd4eEvRXoavEC3vquKtv4yI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
-github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw=
-github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4=
+github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d h1:UgBskn6bIS4Qf2GrRkDDwMRYhMaPiwHuCSW1cp0m62Y=
+github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ=
diff --git a/test/dependencies/go.mod b/test/dependencies/go.mod
index 93ab3f923e0..7cac621284d 100644
--- a/test/dependencies/go.mod
+++ b/test/dependencies/go.mod
@@ -119,7 +119,7 @@ require (
github.com/huin/goupnp v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
- github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 // indirect
+ github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
diff --git a/test/dependencies/go.sum b/test/dependencies/go.sum
index 11be0c149fd..4609170a790 100644
--- a/test/dependencies/go.sum
+++ b/test/dependencies/go.sum
@@ -162,6 +162,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fzipp/gocyclo v0.6.0 h1:lsblElZG7d3ALtGMx9fmxeTKZaLLpU8mET09yN4BBLo=
github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlyaLkLoA=
+github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
+github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
+github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
+github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghostiam/protogetter v0.3.6 h1:R7qEWaSgFCsy20yYHNIJsU9ZOb8TziSRRxuAOTVKeOk=
github.com/ghostiam/protogetter v0.3.6/go.mod h1:7lpeDnEJ1ZjL/YtyoN99ljO4z0pd3H0d18/t2dPBxHw=
@@ -318,8 +322,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
-github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw=
-github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4=
+github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d h1:UgBskn6bIS4Qf2GrRkDDwMRYhMaPiwHuCSW1cp0m62Y=
+github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g=
github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs=
github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=