From 940dde4517c188b6b67e4f5097d1152cae7ae6b6 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Sun, 29 Sep 2024 03:02:13 -0700 Subject: [PATCH 1/6] Return errors using channles and not embedded in result type Asynchronous functions should return errors over channels instead of embedding the error in the result type. Closes #9974 --- client/rpc/pin.go | 46 +++++++++--------- client/rpc/unixfs.go | 47 +++++++++---------- core/commands/ls.go | 11 ++--- core/commands/pin/pin.go | 12 +---- core/commands/pin/remotepin.go | 35 +++++++------- core/coreapi/pin.go | 48 +++++++++++-------- core/coreapi/unixfs.go | 86 ++++++++++++++++++++++++---------- core/coreiface/pin.go | 5 +- core/coreiface/tests/block.go | 7 ++- core/coreiface/tests/pin.go | 18 +++---- core/coreiface/tests/unixfs.go | 52 +++++++++++--------- core/coreiface/unixfs.go | 51 ++++++++++++++++++-- 12 files changed, 248 insertions(+), 170 deletions(-) diff --git a/client/rpc/pin.go b/client/rpc/pin.go index 6e8e942ac0e..eb2aa316cf1 100644 --- a/client/rpc/pin.go +++ b/client/rpc/pin.go @@ -62,10 +62,16 @@ type pinLsObject struct { Type string } -func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) { +func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, <-chan error) { + pins := make(chan iface.Pin) + errOut := make(chan error, 1) + options, err := caopts.PinLsOptions(opts...) if err != nil { - return nil, err + errOut <- err + close(pins) + close(errOut) + return pins, errOut } res, err := api.core().Request("pin/ls"). @@ -73,38 +79,32 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan i Option("stream", true). Send(ctx) if err != nil { - return nil, err + errOut <- err + close(pins) + close(errOut) + return pins, errOut } - pins := make(chan iface.Pin) - go func(ch chan<- iface.Pin) { + go func(ch chan<- iface.Pin, errCh chan<- error) { defer res.Output.Close() defer close(ch) + defer close(errCh) dec := json.NewDecoder(res.Output) var out pinLsObject for { - switch err := dec.Decode(&out); err { - case nil: - case io.EOF: - return - default: - select { - case ch <- pin{err: err}: - return - case <-ctx.Done(): - return + err := dec.Decode(&out) + if err != nil { + if err != io.EOF { + errCh <- err } + return } c, err := cid.Parse(out.Cid) if err != nil { - select { - case ch <- pin{err: err}: - return - case <-ctx.Done(): - return - } + errCh <- err + return } select { @@ -113,8 +113,8 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan i return } } - }(pins) - return pins, nil + }(pins, errOut) + return pins, errOut } // IsPinned returns whether or not the given cid is pinned diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go index 3ba2c1c15f0..c913db67a8f 100644 --- a/client/rpc/unixfs.go +++ b/client/rpc/unixfs.go @@ -144,10 +144,16 @@ type lsOutput struct { Objects []lsObject } -func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) { +func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, <-chan error) { + out := make(chan iface.DirEntry) + errOut := make(chan error, 1) + options, err := caopts.UnixfsLsOptions(opts...) if err != nil { - return nil, err + errOut <- err + close(out) + close(errOut) + return out, errOut } resp, err := api.core().Request("ls", p.String()). @@ -156,45 +162,41 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs Option("stream", true). Send(ctx) if err != nil { - return nil, err + errOut <- err + close(out) + close(errOut) + return out, errOut } if resp.Error != nil { - return nil, resp.Error + errOut <- resp.Error + close(out) + close(errOut) + return out, errOut } dec := json.NewDecoder(resp.Output) - out := make(chan iface.DirEntry) go func() { defer resp.Close() defer close(out) + defer close(errOut) for { var link lsOutput if err := dec.Decode(&link); err != nil { - if err == io.EOF { - return - } - select { - case out <- iface.DirEntry{Err: err}: - case <-ctx.Done(): + if err != io.EOF { + errOut <- err } return } if len(link.Objects) != 1 { - select { - case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}: - case <-ctx.Done(): - } + errOut <- errors.New("unexpected Objects len") return } if len(link.Objects[0].Links) != 1 { - select { - case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}: - case <-ctx.Done(): - } + errOut <- errors.New("unexpected Links len") return } @@ -202,10 +204,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs c, err := cid.Decode(l0.Hash) if err != nil { - select { - case out <- iface.DirEntry{Err: err}: - case <-ctx.Done(): - } + errOut <- err return } @@ -235,7 +234,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs } }() - return out, nil + return out, errOut } func (api *UnixfsAPI) core() *HttpApi { diff --git a/core/commands/ls.go b/core/commands/ls.go index ab914bb0e15..15db7ff53d7 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -139,17 +139,11 @@ The JSON output contains type information. return err } - results, err := api.Unixfs().Ls(req.Context, pth, + results, errCh := api.Unixfs().Ls(req.Context, pth, options.Unixfs.ResolveChildren(resolveSize || resolveType)) - if err != nil { - return err - } processLink, dirDone = processDir() for link := range results { - if link.Err != nil { - return link.Err - } var ftype unixfs_pb.Data_DataType switch link.Type { case iface.TFile: @@ -174,6 +168,9 @@ The JSON output contains type information. return err } } + if err = <-errCh; err != nil { + return err + } dirDone(i) } return done() diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index b87760aaff4..bd9c43a8326 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -557,15 +557,8 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api panic("unhandled pin type") } - pins, err := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name)) - if err != nil { - return err - } - + pins, errCh := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name)) for p := range pins { - if err := p.Err(); err != nil { - return err - } err = emit(PinLsOutputWrapper{ PinLsObject: PinLsObject{ Type: p.Type(), @@ -577,8 +570,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api return err } } - - return nil + return <-errCh } const ( diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go index 3721913e77c..40cb515c5f3 100644 --- a/core/commands/pin/remotepin.go +++ b/core/commands/pin/remotepin.go @@ -285,19 +285,15 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states. cmds.DelimitedStringsOption(",", pinStatusOptionName, "Return pins with the specified statuses (queued,pinning,pinned,failed).").WithDefault([]string{"pinned"}), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - ctx, cancel := context.WithCancel(req.Context) - defer cancel() - c, err := getRemotePinServiceFromRequest(req, env) if err != nil { return err } - psCh, errCh, err := lsRemote(ctx, req, c) - if err != nil { - return err - } + ctx, cancel := context.WithCancel(req.Context) + defer cancel() + psCh, errCh := lsRemote(ctx, req, c) for ps := range psCh { if err := res.Emit(toRemotePinOutput(ps)); err != nil { return err @@ -317,7 +313,7 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states. } // Executes GET /pins/?query-with-filters -func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan pinclient.PinStatusGetter, chan error, error) { +func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-chan pinclient.PinStatusGetter, <-chan error) { opts := []pinclient.LsOption{} if name, nameFound := req.Options[pinNameOptionName]; nameFound { nameStr := name.(string) @@ -330,7 +326,12 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan for _, rawCID := range cidsRawArr { parsedCID, err := cid.Decode(rawCID) if err != nil { - return nil, nil, fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err) + psCh := make(chan pinclient.PinStatusGetter) + errCh := make(chan error, 1) + errCh <- fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err) + close(psCh) + close(errCh) + return psCh, errCh } parsedCIDs = append(parsedCIDs, parsedCID) } @@ -342,16 +343,19 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan for _, rawStatus := range statusRawArr { s := pinclient.Status(rawStatus) if s.String() == string(pinclient.StatusUnknown) { - return nil, nil, fmt.Errorf("status %q is not valid", rawStatus) + psCh := make(chan pinclient.PinStatusGetter) + errCh := make(chan error, 1) + errCh <- fmt.Errorf("status %q is not valid", rawStatus) + close(psCh) + close(errCh) + return psCh, errCh } parsedStatuses = append(parsedStatuses, s) } opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...)) } - psCh, errCh := c.Ls(ctx, opts...) - - return psCh, errCh, nil + return c.Ls(ctx, opts...) } var rmRemotePinCmd = &cmds.Command{ @@ -403,10 +407,7 @@ To list and then remove all pending pin requests, pass an explicit status list: rmIDs := []string{} if len(req.Arguments) == 0 { - psCh, errCh, err := lsRemote(ctx, req, c) - if err != nil { - return err - } + psCh, errCh := lsRemote(ctx, req, c) for ps := range psCh { rmIDs = append(rmIDs, ps.GetRequestId()) } diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 64c65b651c2..c343f4e25c9 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -51,13 +51,18 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp return api.pinning.Flush(ctx) } -func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, error) { +func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, <-chan error) { ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls") defer span.End() settings, err := caopts.PinLsOptions(opts...) if err != nil { - return nil, err + outCh := make(chan coreiface.Pin) + errCh := make(chan error, 1) + errCh <- err + close(outCh) + close(errCh) + return outCh, errCh } span.SetAttributes(attribute.String("type", settings.Type)) @@ -65,10 +70,15 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan c switch settings.Type { case "all", "direct", "indirect", "recursive": default: - return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) + outCh := make(chan coreiface.Pin) + errCh := make(chan error, 1) + errCh <- fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) + close(outCh) + close(errCh) + return outCh, errCh } - return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name), nil + return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name) } func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) { @@ -230,6 +240,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro } out := make(chan coreiface.PinStatus) + go func() { defer close(out) for p := range api.pinning.RecursiveKeys(ctx, false) { @@ -254,7 +265,6 @@ type pinInfo struct { pinType string path path.ImmutablePath name string - err error } func (p *pinInfo) Path() path.ImmutablePath { @@ -269,17 +279,12 @@ func (p *pinInfo) Name() string { return p.name } -func (p *pinInfo) Err() error { - return p.err -} - // pinLsAll is an internal function for returning a list of pins // // The caller must keep reading results until the channel is closed to prevent // leaking the goroutine that is fetching pins. -func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string) <-chan coreiface.Pin { +func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string) (<-chan coreiface.Pin, <-chan error) { out := make(chan coreiface.Pin, 1) - emittedSet := cid.NewSet() AddToResultKeys := func(c cid.Cid, pinName, typeStr string) error { @@ -297,19 +302,22 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, return nil } + errOut := make(chan error, 1) + go func() { defer close(out) + defer close(errOut) var rkeys []cid.Cid var err error if typeStr == "recursive" || typeStr == "all" { for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) { if streamedCid.Err != nil { - out <- &pinInfo{err: streamedCid.Err} + errOut <- streamedCid.Err return } if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil { - out <- &pinInfo{err: err} + errOut <- err return } rkeys = append(rkeys, streamedCid.Pin.Key) @@ -318,11 +326,11 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, if typeStr == "direct" || typeStr == "all" { for streamedCid := range api.pinning.DirectKeys(ctx, detailed) { if streamedCid.Err != nil { - out <- &pinInfo{err: streamedCid.Err} + errOut <- streamedCid.Err return } if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil { - out <- &pinInfo{err: err} + errOut <- err return } } @@ -333,7 +341,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, for streamedCid := range api.pinning.DirectKeys(ctx, detailed) { if streamedCid.Err != nil { - out <- &pinInfo{err: streamedCid.Err} + errOut <- streamedCid.Err return } emittedSet.Add(streamedCid.Pin.Key) @@ -341,7 +349,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) { if streamedCid.Err != nil { - out <- &pinInfo{err: streamedCid.Err} + errOut <- streamedCid.Err return } emittedSet.Add(streamedCid.Pin.Key) @@ -362,7 +370,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, } err := AddToResultKeys(c, "", "indirect") if err != nil { - out <- &pinInfo{err: err} + errOut <- err return false } return true @@ -370,14 +378,14 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { - out <- &pinInfo{err: err} + errOut <- err return } } } }() - return out + return out, errOut } func (api *PinAPI) core() coreiface.CoreAPI { diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index dbeeefda426..b7134b0ffed 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -197,13 +197,18 @@ func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error) // Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format: // ` ` -func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, error) { +func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, <-chan error) { ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String()))) defer span.End() settings, err := options.UnixfsLsOptions(opts...) if err != nil { - return nil, err + errOut := make(chan error, 1) + errOut <- err + close(errOut) + out := make(chan coreiface.DirEntry) + close(out) + return out, errOut } span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren)) @@ -213,21 +218,31 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.Unixf dagnode, err := ses.ResolveNode(ctx, p) if err != nil { - return nil, err + errOut := make(chan error, 1) + errOut <- err + close(errOut) + out := make(chan coreiface.DirEntry) + close(out) + return out, errOut } dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode) - if err == uio.ErrNotADir { - return uses.lsFromLinks(ctx, dagnode.Links(), settings) - } if err != nil { - return nil, err + if err == uio.ErrNotADir { + return uses.lsFromLinks(ctx, dagnode.Links(), settings) + } + errOut := make(chan error, 1) + errOut <- err + close(errOut) + out := make(chan coreiface.DirEntry) + close(out) + return out, errOut } return uses.lsFromLinksAsync(ctx, dir, settings) } -func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) coreiface.DirEntry { +func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) (coreiface.DirEntry, error) { ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "ProcessLink") defer span.End() if linkres.Link != nil { @@ -235,7 +250,7 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se } if linkres.Err != nil { - return coreiface.DirEntry{Err: linkres.Err} + return coreiface.DirEntry{}, linkres.Err } lnk := coreiface.DirEntry{ @@ -252,15 +267,13 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se if settings.ResolveChildren { linkNode, err := linkres.Link.GetNode(ctx, api.dag) if err != nil { - lnk.Err = err - break + return coreiface.DirEntry{}, err } if pn, ok := linkNode.(*merkledag.ProtoNode); ok { d, err := ft.FSNodeFromBytes(pn.Data()) if err != nil { - lnk.Err = err - break + return coreiface.DirEntry{}, err } switch d.Type() { case ft.TFile, ft.TRaw: @@ -284,35 +297,58 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se } } - return lnk + return lnk, nil } -func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, error) { +func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) { out := make(chan coreiface.DirEntry, uio.DefaultShardWidth) + errOut := make(chan error, 1) go func() { defer close(out) + defer close(errOut) + for l := range dir.EnumLinksAsync(ctx) { + dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel + if err != nil { + errOut <- err + return + } select { - case out <- api.processLink(ctx, l, settings): // TODO: perf: processing can be done in background and in parallel + case out <- dirEnt: case <-ctx.Done(): return } } }() - return out, nil + return out, errOut } -func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, error) { - links := make(chan coreiface.DirEntry, len(ndlinks)) - for _, l := range ndlinks { - lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}} +func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) { + out := make(chan coreiface.DirEntry, uio.DefaultShardWidth) + errOut := make(chan error, 1) - links <- api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async - } - close(links) - return links, nil + go func() { + defer close(out) + defer close(errOut) + + for _, l := range ndlinks { + lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}} + dirEnt, err := api.processLink(ctx, lr, settings) // TODO: perf: processing can be done in background and in parallel + if err != nil { + errOut <- err + return + } + select { + case out <- dirEnt: + case <-ctx.Done(): + return + } + } + }() + + return out, errOut } func (api *UnixfsAPI) core() *CoreAPI { diff --git a/core/coreiface/pin.go b/core/coreiface/pin.go index ed837fc9ce2..39cfc5941da 100644 --- a/core/coreiface/pin.go +++ b/core/coreiface/pin.go @@ -18,9 +18,6 @@ type Pin interface { // Type of the pin Type() string - - // if not nil, an error happened. Everything else should be ignored. - Err() error } // PinStatus holds information about pin health @@ -51,7 +48,7 @@ type PinAPI interface { Add(context.Context, path.Path, ...options.PinAddOption) error // Ls returns list of pinned objects on this node - Ls(context.Context, ...options.PinLsOption) (<-chan Pin, error) + Ls(context.Context, ...options.PinLsOption) (<-chan Pin, <-chan error) // IsPinned returns whether or not the given cid is pinned // and an explanation of why its pinned diff --git a/core/coreiface/tests/block.go b/core/coreiface/tests/block.go index 3b4ca0bc05d..f59bf1fa871 100644 --- a/core/coreiface/tests/block.go +++ b/core/coreiface/tests/block.go @@ -323,9 +323,14 @@ func (tp *TestSuite) TestBlockPin(t *testing.T) { t.Fatal(err) } - if pins, err := api.Pin().Ls(ctx); err != nil || len(pins) != 0 { + pinsCh, errCh := api.Pin().Ls(ctx) + _, ok := <-pinsCh + if ok { t.Fatal("expected 0 pins") } + if err = <-errCh; err != nil { + t.Fatal(err) + } res, err := api.Block().Put( ctx, diff --git a/core/coreiface/tests/pin.go b/core/coreiface/tests/pin.go index fdd7c15ccbf..a134854931c 100644 --- a/core/coreiface/tests/pin.go +++ b/core/coreiface/tests/pin.go @@ -593,19 +593,15 @@ func assertNotPinned(t *testing.T, ctx context.Context, api iface.CoreAPI, p pat } } -func accPins(pins <-chan iface.Pin, err error) ([]iface.Pin, error) { - if err != nil { - return nil, err - } - - var result []iface.Pin +func accPins(pins <-chan iface.Pin, errs <-chan error) ([]iface.Pin, error) { + var results []iface.Pin for pin := range pins { - if pin.Err() != nil { - return nil, pin.Err() - } - result = append(result, pin) + results = append(results, pin) + } + if err := <-errs; err != nil { + return nil, err } - return result, nil + return results, nil } diff --git a/core/coreiface/tests/unixfs.go b/core/coreiface/tests/unixfs.go index 9d3362b9aff..1ff4f6f2c5a 100644 --- a/core/coreiface/tests/unixfs.go +++ b/core/coreiface/tests/unixfs.go @@ -681,14 +681,11 @@ func (tp *TestSuite) TestLs(t *testing.T) { t.Fatal(err) } - entries, err := api.Unixfs().Ls(ctx, p) - if err != nil { - t.Fatal(err) - } + entries, errCh := api.Unixfs().Ls(ctx, p) - entry := <-entries - if entry.Err != nil { - t.Fatal(entry.Err) + entry, ok := <-entries + if !ok { + t.Fatal("expected another entry") } if entry.Size != 15 { t.Errorf("expected size = 15, got %d", entry.Size) @@ -702,9 +699,9 @@ func (tp *TestSuite) TestLs(t *testing.T) { if entry.Cid.String() != "QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr" { t.Errorf("expected cid = QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr, got %s", entry.Cid) } - entry = <-entries - if entry.Err != nil { - t.Fatal(entry.Err) + entry, ok = <-entries + if !ok { + t.Fatal("expected another entry") } if entry.Type != coreiface.TSymlink { t.Errorf("wrong type %s", entry.Type) @@ -716,11 +713,12 @@ func (tp *TestSuite) TestLs(t *testing.T) { t.Errorf("expected symlink target to be /foo/bar, got %s", entry.Target) } - if l, ok := <-entries; ok { - t.Errorf("didn't expect a second link") - if l.Err != nil { - t.Error(l.Err) - } + _, ok = <-entries + if ok { + t.Errorf("didn't expect a another link") + } + if err = <-errCh; err != nil { + t.Error(err) } } @@ -779,13 +777,17 @@ func (tp *TestSuite) TestLsEmptyDir(t *testing.T) { t.Fatal(err) } - links, err := api.Unixfs().Ls(ctx, p) - if err != nil { + links, errCh := api.Unixfs().Ls(ctx, p) + var count int + for range links { + count++ + } + if err = <-errCh; err != nil { t.Fatal(err) } - if len(links) != 0 { - t.Fatalf("expected 0 links, got %d", len(links)) + if count != 0 { + t.Fatalf("expected 0 links, got %d", count) } } @@ -808,13 +810,17 @@ func (tp *TestSuite) TestLsNonUnixfs(t *testing.T) { t.Fatal(err) } - links, err := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid())) - if err != nil { + links, errCh := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid())) + var count int + for range links { + count++ + } + if err = <-errCh; err != nil { t.Fatal(err) } - if len(links) != 0 { - t.Fatalf("expected 0 links, got %d", len(links)) + if count != 0 { + t.Fatalf("expected 0 links, got %d", count) } } diff --git a/core/coreiface/unixfs.go b/core/coreiface/unixfs.go index c0150bd12c6..32bedd93ea2 100644 --- a/core/coreiface/unixfs.go +++ b/core/coreiface/unixfs.go @@ -63,8 +63,6 @@ type DirEntry struct { Mode os.FileMode ModTime time.Time - - Err error } // UnixfsAPI is the basic interface to immutable files in IPFS @@ -81,7 +79,50 @@ type UnixfsAPI interface { // to operations performed on the returned file Get(context.Context, path.Path) (files.Node, error) - // Ls returns the list of links in a directory. Links aren't guaranteed to be - // returned in order - Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, error) + // Ls returns the list of links in a directory. Links aren't guaranteed to + // be returned in order. If an error occurs, the DirEntry channel is closed + // and an error is output on the error channel. Both channels are closed if + // the context is canceled. + // + // Example: + // + // dirs, errs := Ls(ctx, p) + // for dirEnt := range dirs { + // fmt.Println("Dir name:", dirEnt.Name) + // } + // err := <-errs + // if err != nil { + // return fmt.Errorf("error listing directory: %w", err) + // } + Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, <-chan error) +} + +/* TODO: Uncomment after go1.23 required. +// LsIter returns a go iterator that allows ranging over DirEntry results. +// Iteration stops if the context is canceled or if the iterator yields an +// error. +// +// Exmaple: +// +// for dirEnt, err := LsIter(ctx, ufsAPI, p) { +// if err != nil { +// return fmt.Errorf("error listing directory: %w", err) +// } +// fmt.Println("Dir name:", dirEnt.Name) +// } +func LsIter(ctx context.Context, api UnixfsAPI, p path.Path, opts ...options.UnixfsLsOption) iter.Seq2[DirEntry, error] { + return func(yield func(DirEntry, error) bool) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // cancel Ls if done iterating early + results, asyncErr := api.Ls(ctx, p, opts...) + for result := range results { + if !yield(result, nil) { + return + } + } + if err != <-asyncErr; err != nil { + yield(nil, err) + } + } } +*/ From 09815c11f1381bacd7997e3b624a06cc920ce998 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 16 Oct 2024 05:03:38 -1000 Subject: [PATCH 2/6] Add LsIter to iterate items from UnixfsAPI interfaces --- core/coreiface/unixfs.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/coreiface/unixfs.go b/core/coreiface/unixfs.go index 32bedd93ea2..6bdcdfa5045 100644 --- a/core/coreiface/unixfs.go +++ b/core/coreiface/unixfs.go @@ -2,6 +2,7 @@ package iface import ( "context" + "iter" "os" "time" @@ -97,7 +98,6 @@ type UnixfsAPI interface { Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, <-chan error) } -/* TODO: Uncomment after go1.23 required. // LsIter returns a go iterator that allows ranging over DirEntry results. // Iteration stops if the context is canceled or if the iterator yields an // error. @@ -120,9 +120,8 @@ func LsIter(ctx context.Context, api UnixfsAPI, p path.Path, opts ...options.Uni return } } - if err != <-asyncErr; err != nil { - yield(nil, err) + if err := <-asyncErr; err != nil { + yield(DirEntry{}, err) } } } -*/ From 58f7a2520bd2ab453edf1aa5da00d58e4dc7ae62 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 7 Nov 2024 07:57:37 -1000 Subject: [PATCH 3/6] wip --- client/rpc/pin.go | 63 +++++-------- client/rpc/unixfs.go | 113 ++++++++++------------- core/commands/ls.go | 17 +++- core/commands/pin/pin.go | 12 ++- core/commands/pin/remotepin.go | 20 ++-- core/coreapi/pin.go | 162 +++++++++++++++------------------ core/coreiface/pin.go | 5 +- core/coreiface/tests/block.go | 13 ++- core/coreiface/tests/pin.go | 34 ++++--- core/coreiface/tests/unixfs.go | 2 +- 10 files changed, 205 insertions(+), 236 deletions(-) diff --git a/client/rpc/pin.go b/client/rpc/pin.go index eb2aa316cf1..c63ca515ff9 100644 --- a/client/rpc/pin.go +++ b/client/rpc/pin.go @@ -62,16 +62,12 @@ type pinLsObject struct { Type string } -func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, <-chan error) { - pins := make(chan iface.Pin) - errOut := make(chan error, 1) +func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts.PinLsOption) error { + defer close(pins) options, err := caopts.PinLsOptions(opts...) if err != nil { - errOut <- err - close(pins) - close(errOut) - return pins, errOut + return err } res, err := api.core().Request("pin/ls"). @@ -79,42 +75,33 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan i Option("stream", true). Send(ctx) if err != nil { - errOut <- err - close(pins) - close(errOut) - return pins, errOut + return err } - - go func(ch chan<- iface.Pin, errCh chan<- error) { - defer res.Output.Close() - defer close(ch) - defer close(errCh) - - dec := json.NewDecoder(res.Output) - var out pinLsObject - for { - err := dec.Decode(&out) - if err != nil { - if err != io.EOF { - errCh <- err - } - return + defer res.Output.Close() + + dec := json.NewDecoder(res.Output) + var out pinLsObject + for { + err := dec.Decode(&out) + if err != nil { + if err != io.EOF { + return err } + return nil + } - c, err := cid.Parse(out.Cid) - if err != nil { - errCh <- err - return - } + c, err := cid.Parse(out.Cid) + if err != nil { + return err + } - select { - case ch <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}: - case <-ctx.Done(): - return - } + select { + case pins <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}: + case <-ctx.Done(): + return ctx.Err() } - }(pins, errOut) - return pins, errOut + } + return nil } // IsPinned returns whether or not the given cid is pinned diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go index c913db67a8f..70eb92a3a1d 100644 --- a/client/rpc/unixfs.go +++ b/client/rpc/unixfs.go @@ -144,16 +144,12 @@ type lsOutput struct { Objects []lsObject } -func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, <-chan error) { - out := make(chan iface.DirEntry) - errOut := make(chan error, 1) +func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error { + defer close(out) options, err := caopts.UnixfsLsOptions(opts...) if err != nil { - errOut <- err - close(out) - close(errOut) - return out, errOut + return err } resp, err := api.core().Request("ls", p.String()). @@ -162,79 +158,66 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs Option("stream", true). Send(ctx) if err != nil { - errOut <- err - close(out) - close(errOut) - return out, errOut + return err } if resp.Error != nil { - errOut <- resp.Error - close(out) - close(errOut) - return out, errOut + return err } + defer resp.Close() dec := json.NewDecoder(resp.Output) - go func() { - defer resp.Close() - defer close(out) - defer close(errOut) - - for { - var link lsOutput - if err := dec.Decode(&link); err != nil { - if err != io.EOF { - errOut <- err - } - return + for { + var link lsOutput + if err = dec.Decode(&link); err != nil { + if err != io.EOF { + return err } + return nil + } - if len(link.Objects) != 1 { - errOut <- errors.New("unexpected Objects len") - return - } + if len(link.Objects) != 1 { + return errors.New("unexpected Objects len") + } - if len(link.Objects[0].Links) != 1 { - errOut <- errors.New("unexpected Links len") - return - } + if len(link.Objects[0].Links) != 1 { + return errors.New("unexpected Links len") + } - l0 := link.Objects[0].Links[0] + l0 := link.Objects[0].Links[0] - c, err := cid.Decode(l0.Hash) - if err != nil { - errOut <- err - return - } + c, err := cid.Decode(l0.Hash) + if err != nil { + return err + } - var ftype iface.FileType - switch l0.Type { - case unixfs.TRaw, unixfs.TFile: - ftype = iface.TFile - case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata: - ftype = iface.TDirectory - case unixfs.TSymlink: - ftype = iface.TSymlink - } + var ftype iface.FileType + switch l0.Type { + case unixfs.TRaw, unixfs.TFile: + ftype = iface.TFile + case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata: + ftype = iface.TDirectory + case unixfs.TSymlink: + ftype = iface.TSymlink + } - select { - case out <- iface.DirEntry{ - Name: l0.Name, - Cid: c, - Size: l0.Size, - Type: ftype, - Target: l0.Target, - - Mode: l0.Mode, - ModTime: l0.ModTime, - }: - case <-ctx.Done(): - } + select { + case out <- iface.DirEntry{ + Name: l0.Name, + Cid: c, + Size: l0.Size, + Type: ftype, + Target: l0.Target, + + Mode: l0.Mode, + ModTime: l0.ModTime, + }: + case <-ctx.Done(): + return ctx.Err() } - }() + } - return out, errOut + return nil } func (api *UnixfsAPI) core() *HttpApi { diff --git a/core/commands/ls.go b/core/commands/ls.go index 15db7ff53d7..da1d9ebc530 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -133,14 +133,21 @@ The JSON output contains type information. } } + lsCtx, cancel := context.WithCancel(req.Context) + defer cancel() + for i, fpath := range paths { pth, err := cmdutils.PathOrCidPath(fpath) if err != nil { return err } - results, errCh := api.Unixfs().Ls(req.Context, pth, - options.Unixfs.ResolveChildren(resolveSize || resolveType)) + results := make(chan iface.DirEntry) + var lsErr error + go func() { + lsErr = api.Unixfs().Ls(lsCtx, pth, results, + options.Unixfs.ResolveChildren(resolveSize || resolveType)) + }() processLink, dirDone = processDir() for link := range results { @@ -164,12 +171,12 @@ The JSON output contains type information. Mode: link.Mode, ModTime: link.ModTime, } - if err := processLink(paths[i], lsLink); err != nil { + if err = processLink(paths[i], lsLink); err != nil { return err } } - if err = <-errCh; err != nil { - return err + if lsErr != nil { + return lsErr } dirDone(i) } diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index bd9c43a8326..a73e352e268 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -557,7 +557,15 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api panic("unhandled pin type") } - pins, errCh := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name)) + var lsErr error + pins := make(chan iface.Pin) + lsCtx, cancel := context.WithCancel(req.Context) + defer cancel() + + go func() { + lsErr = api.Pin().Ls(lsCtx, opt, pins, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name)) + }() + for p := range pins { err = emit(PinLsOutputWrapper{ PinLsObject: PinLsObject{ @@ -570,7 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api return err } } - return <-errCh + return lsErr } const ( diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go index 40cb515c5f3..7319bcd9644 100644 --- a/core/commands/pin/remotepin.go +++ b/core/commands/pin/remotepin.go @@ -313,7 +313,7 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states. } // Executes GET /pins/?query-with-filters -func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-chan pinclient.PinStatusGetter, <-chan error) { +func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out chan<- pinclient.PinStatusGetter) error { opts := []pinclient.LsOption{} if name, nameFound := req.Options[pinNameOptionName]; nameFound { nameStr := name.(string) @@ -326,12 +326,8 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-ch for _, rawCID := range cidsRawArr { parsedCID, err := cid.Decode(rawCID) if err != nil { - psCh := make(chan pinclient.PinStatusGetter) - errCh := make(chan error, 1) - errCh <- fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err) - close(psCh) - close(errCh) - return psCh, errCh + close(out) + return fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err) } parsedCIDs = append(parsedCIDs, parsedCID) } @@ -343,19 +339,15 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-ch for _, rawStatus := range statusRawArr { s := pinclient.Status(rawStatus) if s.String() == string(pinclient.StatusUnknown) { - psCh := make(chan pinclient.PinStatusGetter) - errCh := make(chan error, 1) - errCh <- fmt.Errorf("status %q is not valid", rawStatus) - close(psCh) - close(errCh) - return psCh, errCh + close(out) + return fmt.Errorf("status %q is not valid", rawStatus) } parsedStatuses = append(parsedStatuses, s) } opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...)) } - return c.Ls(ctx, opts...) + return c.Ls(ctx, out, opts...) } var rmRemotePinCmd = &cmds.Command{ diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index c343f4e25c9..eb3cdf3fec6 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -51,18 +51,14 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp return api.pinning.Flush(ctx) } -func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, <-chan error) { +func (api *PinAPI) Ls(ctx context.Context, pins chan<- coreiface.Pin, opts ...caopts.PinLsOption) error { ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls") defer span.End() settings, err := caopts.PinLsOptions(opts...) if err != nil { - outCh := make(chan coreiface.Pin) - errCh := make(chan error, 1) - errCh <- err - close(outCh) - close(errCh) - return outCh, errCh + close(pins) + return err } span.SetAttributes(attribute.String("type", settings.Type)) @@ -70,15 +66,11 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan c switch settings.Type { case "all", "direct", "indirect", "recursive": default: - outCh := make(chan coreiface.Pin) - errCh := make(chan error, 1) - errCh <- fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) - close(outCh) - close(errCh) - return outCh, errCh + close(pins) + return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) } - return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name) + return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name, pins) } func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) { @@ -283,8 +275,8 @@ func (p *pinInfo) Name() string { // // The caller must keep reading results until the channel is closed to prevent // leaking the goroutine that is fetching pins. -func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string) (<-chan coreiface.Pin, <-chan error) { - out := make(chan coreiface.Pin, 1) +func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string, out chan<- coreiface.Pin) error { + defer close(out) emittedSet := cid.NewSet() AddToResultKeys := func(c cid.Cid, pinName, typeStr string) error { @@ -302,90 +294,82 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, return nil } - errOut := make(chan error, 1) - - go func() { - defer close(out) - defer close(errOut) - - var rkeys []cid.Cid - var err error - if typeStr == "recursive" || typeStr == "all" { - for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) { - if streamedCid.Err != nil { - errOut <- streamedCid.Err - return - } - if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil { - errOut <- err - return - } - rkeys = append(rkeys, streamedCid.Pin.Key) + var rkeys []cid.Cid + var err error + if typeStr == "recursive" || typeStr == "all" { + for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) { + if streamedCid.Err != nil { + return streamedCid.Err } + if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil { + return err + } + rkeys = append(rkeys, streamedCid.Pin.Key) } - if typeStr == "direct" || typeStr == "all" { - for streamedCid := range api.pinning.DirectKeys(ctx, detailed) { - if streamedCid.Err != nil { - errOut <- streamedCid.Err - return - } - if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil { - errOut <- err - return - } + } + if typeStr == "direct" || typeStr == "all" { + for streamedCid := range api.pinning.DirectKeys(ctx, detailed) { + if streamedCid.Err != nil { + return streamedCid.Err + } + if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil { + return err } } - if typeStr == "indirect" { - // We need to first visit the direct pins that have priority - // without emitting them - - for streamedCid := range api.pinning.DirectKeys(ctx, detailed) { - if streamedCid.Err != nil { - errOut <- streamedCid.Err - return - } - emittedSet.Add(streamedCid.Pin.Key) + } + if typeStr == "indirect" { + // We need to first visit the direct pins that have priority + // without emitting them + + for streamedCid := range api.pinning.DirectKeys(ctx, detailed) { + if streamedCid.Err != nil { + return streamedCid.Err } + emittedSet.Add(streamedCid.Pin.Key) + } - for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) { - if streamedCid.Err != nil { - errOut <- streamedCid.Err - return - } - emittedSet.Add(streamedCid.Pin.Key) - rkeys = append(rkeys, streamedCid.Pin.Key) + for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) { + if streamedCid.Err != nil { + return streamedCid.Err } + emittedSet.Add(streamedCid.Pin.Key) + rkeys = append(rkeys, streamedCid.Pin.Key) + } + } + if typeStr == "indirect" || typeStr == "all" { + if len(rkeys) == 0 { + return nil } - if typeStr == "indirect" || typeStr == "all" { - walkingSet := cid.NewSet() - for _, k := range rkeys { - err = merkledag.Walk( - ctx, merkledag.GetLinksWithDAG(api.dag), k, - func(c cid.Cid) bool { - if !walkingSet.Visit(c) { - return false - } - if emittedSet.Has(c) { - return true // skipped - } - err := AddToResultKeys(c, "", "indirect") - if err != nil { - errOut <- err - return false - } - return true - }, - merkledag.SkipRoot(), merkledag.Concurrent(), - ) - if err != nil { - errOut <- err - return - } + var addErr error + walkingSet := cid.NewSet() + for _, k := range rkeys { + err = merkledag.Walk( + ctx, merkledag.GetLinksWithDAG(api.dag), k, + func(c cid.Cid) bool { + if !walkingSet.Visit(c) { + return false + } + if emittedSet.Has(c) { + return true // skipped + } + addErr := AddToResultKeys(c, "", "indirect") + if addErr != nil { + return false + } + return true + }, + merkledag.SkipRoot(), merkledag.Concurrent(), + ) + if err != nil { + return err + } + if addErr != nil { + return addErr } } - }() + } - return out, errOut + return nil } func (api *PinAPI) core() coreiface.CoreAPI { diff --git a/core/coreiface/pin.go b/core/coreiface/pin.go index 39cfc5941da..e0fd2fb90ed 100644 --- a/core/coreiface/pin.go +++ b/core/coreiface/pin.go @@ -47,8 +47,9 @@ type PinAPI interface { // tree Add(context.Context, path.Path, ...options.PinAddOption) error - // Ls returns list of pinned objects on this node - Ls(context.Context, ...options.PinLsOption) (<-chan Pin, <-chan error) + // Ls returns this node's pinned objects on the provided channel. The + // channel is closed when there are no more pins and an error is returned. + Ls(context.Context, chan<- Pin, ...options.PinLsOption) error // IsPinned returns whether or not the given cid is pinned // and an explanation of why its pinned diff --git a/core/coreiface/tests/block.go b/core/coreiface/tests/block.go index f59bf1fa871..2b5a68a63b8 100644 --- a/core/coreiface/tests/block.go +++ b/core/coreiface/tests/block.go @@ -323,12 +323,15 @@ func (tp *TestSuite) TestBlockPin(t *testing.T) { t.Fatal(err) } - pinsCh, errCh := api.Pin().Ls(ctx) - _, ok := <-pinsCh - if ok { + pinCh := make(chan coreiface.Pin) + go func() { + err = api.Pin().Ls(ctx, pinCh) + }() + + for range pinCh { t.Fatal("expected 0 pins") } - if err = <-errCh; err != nil { + if err != nil { t.Fatal(err) } @@ -342,7 +345,7 @@ func (tp *TestSuite) TestBlockPin(t *testing.T) { t.Fatal(err) } - pins, err := accPins(api.Pin().Ls(ctx)) + pins, err := accPins(ctx, api) if err != nil { t.Fatal(err) } diff --git a/core/coreiface/tests/pin.go b/core/coreiface/tests/pin.go index a134854931c..4c606323fec 100644 --- a/core/coreiface/tests/pin.go +++ b/core/coreiface/tests/pin.go @@ -67,7 +67,7 @@ func (tp *TestSuite) TestPinSimple(t *testing.T) { t.Fatal(err) } - list, err := accPins(api.Pin().Ls(ctx)) + list, err := accPins(ctx, api) if err != nil { t.Fatal(err) } @@ -91,7 +91,7 @@ func (tp *TestSuite) TestPinSimple(t *testing.T) { t.Fatal(err) } - list, err = accPins(api.Pin().Ls(ctx)) + list, err = accPins(ctx, api) if err != nil { t.Fatal(err) } @@ -143,7 +143,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) { t.Fatal(err) } - list, err := accPins(api.Pin().Ls(ctx)) + list, err := accPins(ctx, api) if err != nil { t.Fatal(err) } @@ -152,7 +152,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) { t.Errorf("unexpected pin list len: %d", len(list)) } - list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Direct())) + list, err = accPins(ctx, api, opt.Pin.Ls.Direct()) if err != nil { t.Fatal(err) } @@ -165,7 +165,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) { t.Errorf("unexpected path, %s != %s", list[0].Path().String(), path.FromCid(nd3.Cid()).String()) } - list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Recursive())) + list, err = accPins(ctx, api, opt.Pin.Ls.Recursive()) if err != nil { t.Fatal(err) } @@ -178,7 +178,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) { t.Errorf("unexpected path, %s != %s", list[0].Path().String(), path.FromCid(nd2.Cid()).String()) } - list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Indirect())) + list, err = accPins(ctx, api, opt.Pin.Ls.Indirect()) if err != nil { t.Fatal(err) } @@ -436,21 +436,21 @@ func getThreeChainedNodes(t *testing.T, ctx context.Context, api iface.CoreAPI, func assertPinTypes(t *testing.T, ctx context.Context, api iface.CoreAPI, recusive, direct, indirect []cidContainer) { assertPinLsAllConsistency(t, ctx, api) - list, err := accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Recursive())) + list, err := accPins(ctx, api, opt.Pin.Ls.Recursive()) if err != nil { t.Fatal(err) } assertPinCids(t, list, recusive...) - list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Direct())) + list, err = accPins(ctx, api, opt.Pin.Ls.Direct()) if err != nil { t.Fatal(err) } assertPinCids(t, list, direct...) - list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Indirect())) + list, err = accPins(ctx, api, opt.Pin.Ls.Indirect()) if err != nil { t.Fatal(err) } @@ -500,7 +500,7 @@ func assertPinCids(t *testing.T, pins []iface.Pin, cids ...cidContainer) { // assertPinLsAllConsistency verifies that listing all pins gives the same result as listing the pin types individually func assertPinLsAllConsistency(t *testing.T, ctx context.Context, api iface.CoreAPI) { t.Helper() - allPins, err := accPins(api.Pin().Ls(ctx)) + allPins, err := accPins(ctx, api) if err != nil { t.Fatal(err) } @@ -531,7 +531,7 @@ func assertPinLsAllConsistency(t *testing.T, ctx context.Context, api iface.Core } for typeStr, pinProps := range typeMap { - pins, err := accPins(api.Pin().Ls(ctx, pinProps.PinLsOption)) + pins, err := accPins(ctx, api, pinProps.PinLsOption) if err != nil { t.Fatal(err) } @@ -593,15 +593,19 @@ func assertNotPinned(t *testing.T, ctx context.Context, api iface.CoreAPI, p pat } } -func accPins(pins <-chan iface.Pin, errs <-chan error) ([]iface.Pin, error) { - var results []iface.Pin +func accPins(ctx context.Context, api iface.CoreAPI, opts ...opt.PinLsOption) ([]iface.Pin, error) { + var err error + pins := make(chan iface.Pin) + go func() { + err = api.Pin().Ls(ctx, pins, opts...) + }() + var results []iface.Pin for pin := range pins { results = append(results, pin) } - if err := <-errs; err != nil { + if err != nil { return nil, err } - return results, nil } diff --git a/core/coreiface/tests/unixfs.go b/core/coreiface/tests/unixfs.go index 1ff4f6f2c5a..e806ae9baef 100644 --- a/core/coreiface/tests/unixfs.go +++ b/core/coreiface/tests/unixfs.go @@ -544,7 +544,7 @@ func (tp *TestSuite) TestAddPinned(t *testing.T) { t.Fatal(err) } - pins, err := accPins(api.Pin().Ls(ctx)) + pins, err := accPins(ctx, api) if err != nil { t.Fatal(err) } From 10fee6d54910a8d690c5ec896eb9538f6751b654 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 25 Nov 2024 12:36:56 -1000 Subject: [PATCH 4/6] APIs take chan argument and return error --- client/rpc/pin.go | 2 +- client/rpc/unixfs.go | 2 +- core/commands/ls.go | 9 ++-- core/commands/pin/pin.go | 8 +-- core/commands/pin/remotepin.go | 53 ++++++++++++-------- core/coreapi/unixfs.go | 90 +++++++++++++--------------------- core/coreiface/tests/unixfs.go | 20 ++++++-- core/coreiface/unixfs.go | 30 +++++++----- 8 files changed, 115 insertions(+), 99 deletions(-) diff --git a/client/rpc/pin.go b/client/rpc/pin.go index c63ca515ff9..bceb5498cea 100644 --- a/client/rpc/pin.go +++ b/client/rpc/pin.go @@ -87,7 +87,7 @@ func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts if err != io.EOF { return err } - return nil + break } c, err := cid.Parse(out.Cid) diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go index 70eb92a3a1d..faa6086c240 100644 --- a/client/rpc/unixfs.go +++ b/client/rpc/unixfs.go @@ -173,7 +173,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirE if err != io.EOF { return err } - return nil + break } if len(link.Objects) != 1 { diff --git a/core/commands/ls.go b/core/commands/ls.go index da1d9ebc530..bdd475d96cb 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -1,6 +1,7 @@ package commands import ( + "context" "fmt" "io" "os" @@ -143,9 +144,9 @@ The JSON output contains type information. } results := make(chan iface.DirEntry) - var lsErr error + lsErr := make(chan error, 1) go func() { - lsErr = api.Unixfs().Ls(lsCtx, pth, results, + lsErr <- api.Unixfs().Ls(lsCtx, pth, results, options.Unixfs.ResolveChildren(resolveSize || resolveType)) }() @@ -175,8 +176,8 @@ The JSON output contains type information. return err } } - if lsErr != nil { - return lsErr + if err = <-lsErr; err != nil { + return err } dirDone(i) } diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index a73e352e268..428a75b695d 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -557,13 +557,13 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api panic("unhandled pin type") } - var lsErr error - pins := make(chan iface.Pin) + pins := make(chan coreiface.Pin) + lsErr := make(chan error, 1) lsCtx, cancel := context.WithCancel(req.Context) defer cancel() go func() { - lsErr = api.Pin().Ls(lsCtx, opt, pins, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name)) + lsErr <- api.Pin().Ls(lsCtx, pins, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name)) }() for p := range pins { @@ -578,7 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api return err } } - return lsErr + return <-lsErr } const ( diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go index 7319bcd9644..fc852fbbfc6 100644 --- a/core/commands/pin/remotepin.go +++ b/core/commands/pin/remotepin.go @@ -293,14 +293,18 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states. ctx, cancel := context.WithCancel(req.Context) defer cancel() - psCh, errCh := lsRemote(ctx, req, c) + psCh := make(chan pinclient.PinStatusGetter) + lsErr := make(chan error, 1) + go func() { + lsErr <- lsRemote(ctx, req, c, psCh) + }() for ps := range psCh { if err := res.Emit(toRemotePinOutput(ps)); err != nil { return err } } - return <-errCh + return <-lsErr }, Type: RemotePinOutput{}, Encoders: cmds.EncoderMap{ @@ -347,7 +351,12 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out c opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...)) } - return c.Ls(ctx, out, opts...) + rmtOut, rmtErr := c.Ls(ctx, opts...) + for p := range rmtOut { + out <- p + } + return <-rmtErr + //return c.Ls2(ctx, out, opts...) } var rmRemotePinCmd = &cmds.Command{ @@ -389,33 +398,37 @@ To list and then remove all pending pin requests, pass an explicit status list: cmds.BoolOption(pinForceOptionName, "Allow removal of multiple pins matching the query without additional confirmation.").WithDefault(false), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - ctx, cancel := context.WithCancel(req.Context) - defer cancel() - c, err := getRemotePinServiceFromRequest(req, env) if err != nil { return err } rmIDs := []string{} - if len(req.Arguments) == 0 { - psCh, errCh := lsRemote(ctx, req, c) - for ps := range psCh { - rmIDs = append(rmIDs, ps.GetRequestId()) - } - if err = <-errCh; err != nil { - return fmt.Errorf("error while listing remote pins: %v", err) - } - - if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) { - return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal") - } - } else { + if len(req.Arguments) != 0 { return fmt.Errorf("unexpected argument %q", req.Arguments[0]) } + psCh := make(chan pinclient.PinStatusGetter) + errCh := make(chan error, 1) + ctx, cancel := context.WithCancel(req.Context) + defer cancel() + + go func() { + errCh <- lsRemote(ctx, req, c, psCh) + }() + for ps := range psCh { + rmIDs = append(rmIDs, ps.GetRequestId()) + } + if err = <-errCh; err != nil { + return fmt.Errorf("error while listing remote pins: %v", err) + } + + if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) { + return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal") + } + for _, rmID := range rmIDs { - if err := c.DeleteByID(ctx, rmID); err != nil { + if err = c.DeleteByID(ctx, rmID); err != nil { return fmt.Errorf("removing pin identified by requestid=%q failed: %v", rmID, err) } } diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index b7134b0ffed..9d91f09b6ef 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -2,6 +2,7 @@ package coreapi import ( "context" + "errors" "fmt" blockservice "github.com/ipfs/boxo/blockservice" @@ -197,18 +198,15 @@ func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error) // Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format: // ` ` -func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, <-chan error) { +func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- coreiface.DirEntry, opts ...options.UnixfsLsOption) error { ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String()))) defer span.End() + defer close(out) + settings, err := options.UnixfsLsOptions(opts...) if err != nil { - errOut := make(chan error, 1) - errOut <- err - close(errOut) - out := make(chan coreiface.DirEntry) - close(out) - return out, errOut + return err } span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren)) @@ -218,28 +216,18 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.Unixf dagnode, err := ses.ResolveNode(ctx, p) if err != nil { - errOut := make(chan error, 1) - errOut <- err - close(errOut) - out := make(chan coreiface.DirEntry) - close(out) - return out, errOut + return err } dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode) if err != nil { - if err == uio.ErrNotADir { - return uses.lsFromLinks(ctx, dagnode.Links(), settings) + if errors.Is(err, uio.ErrNotADir) { + return uses.lsFromLinks(ctx, dagnode.Links(), settings, out) } - errOut := make(chan error, 1) - errOut <- err - close(errOut) - out := make(chan coreiface.DirEntry) - close(out) - return out, errOut + return err } - return uses.lsFromLinksAsync(ctx, dir, settings) + return uses.lsFromDirLinks(ctx, dir, settings, out) } func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) (coreiface.DirEntry, error) { @@ -300,55 +288,47 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se return lnk, nil } -func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) { - out := make(chan coreiface.DirEntry, uio.DefaultShardWidth) - errOut := make(chan error, 1) - - go func() { - defer close(out) - defer close(errOut) - - for l := range dir.EnumLinksAsync(ctx) { - dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel - if err != nil { - errOut <- err - return - } - select { - case out <- dirEnt: - case <-ctx.Done(): - return - } +func (api *UnixfsAPI) lsFromDirLinks(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error { + for l := range dir.EnumLinksAsync(ctx) { + dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel + if err != nil { + return err } - }() - - return out, errOut + select { + case out <- dirEnt: + case <-ctx.Done(): + return nil + } + } + return nil } -func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) { - out := make(chan coreiface.DirEntry, uio.DefaultShardWidth) - errOut := make(chan error, 1) - +func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error { + // Create links channel large enough to not block when writing to out is slower. + links := make(chan coreiface.DirEntry, len(ndlinks)) + errs := make(chan error, 1) go func() { - defer close(out) - defer close(errOut) - + defer close(links) + defer close(errs) for _, l := range ndlinks { lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}} - dirEnt, err := api.processLink(ctx, lr, settings) // TODO: perf: processing can be done in background and in parallel + lnk, err := api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async if err != nil { - errOut <- err + errs <- err return } select { - case out <- dirEnt: + case links <- lnk: case <-ctx.Done(): return } } }() - return out, errOut + for lnk := range links { + out <- lnk + } + return <-errs } func (api *UnixfsAPI) core() *CoreAPI { diff --git a/core/coreiface/tests/unixfs.go b/core/coreiface/tests/unixfs.go index e806ae9baef..987d39b2620 100644 --- a/core/coreiface/tests/unixfs.go +++ b/core/coreiface/tests/unixfs.go @@ -681,7 +681,11 @@ func (tp *TestSuite) TestLs(t *testing.T) { t.Fatal(err) } - entries, errCh := api.Unixfs().Ls(ctx, p) + errCh := make(chan error, 1) + entries := make(chan coreiface.DirEntry) + go func() { + errCh <- api.Unixfs().Ls(ctx, p, entries) + }() entry, ok := <-entries if !ok { @@ -777,7 +781,12 @@ func (tp *TestSuite) TestLsEmptyDir(t *testing.T) { t.Fatal(err) } - links, errCh := api.Unixfs().Ls(ctx, p) + errCh := make(chan error, 1) + links := make(chan coreiface.DirEntry) + go func() { + errCh <- api.Unixfs().Ls(ctx, p, links) + }() + var count int for range links { count++ @@ -810,7 +819,12 @@ func (tp *TestSuite) TestLsNonUnixfs(t *testing.T) { t.Fatal(err) } - links, errCh := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid())) + errCh := make(chan error, 1) + links := make(chan coreiface.DirEntry) + go func() { + errCh <- api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()), links) + }() + var count int for range links { count++ diff --git a/core/coreiface/unixfs.go b/core/coreiface/unixfs.go index 6bdcdfa5045..10371998c20 100644 --- a/core/coreiface/unixfs.go +++ b/core/coreiface/unixfs.go @@ -80,22 +80,25 @@ type UnixfsAPI interface { // to operations performed on the returned file Get(context.Context, path.Path) (files.Node, error) - // Ls returns the list of links in a directory. Links aren't guaranteed to - // be returned in order. If an error occurs, the DirEntry channel is closed - // and an error is output on the error channel. Both channels are closed if - // the context is canceled. + // Ls writes the links in a directory to the DirEntry channel. Links aren't + // guaranteed to be returned in order. If an error occurs or the context is + // canceled, the DirEntry channel is closed and an error is returned. // // Example: // - // dirs, errs := Ls(ctx, p) + // dirs := make(chan DirEntry) + // lsErr := make(chan error, 1) + // go func() { + // lsErr <- Ls(ctx, p, dirs) + // }() // for dirEnt := range dirs { // fmt.Println("Dir name:", dirEnt.Name) // } - // err := <-errs + // err := <-lsErr // if err != nil { // return fmt.Errorf("error listing directory: %w", err) // } - Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, <-chan error) + Ls(context.Context, path.Path, chan<- DirEntry, ...options.UnixfsLsOption) error } // LsIter returns a go iterator that allows ranging over DirEntry results. @@ -114,13 +117,18 @@ func LsIter(ctx context.Context, api UnixfsAPI, p path.Path, opts ...options.Uni return func(yield func(DirEntry, error) bool) { ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel Ls if done iterating early - results, asyncErr := api.Ls(ctx, p, opts...) - for result := range results { - if !yield(result, nil) { + + dirs := make(chan DirEntry) + lsErr := make(chan error, 1) + go func() { + lsErr <- api.Ls(ctx, p, dirs, opts...) + }() + for dirEnt := range dirs { + if !yield(dirEnt, nil) { return } } - if err := <-asyncErr; err != nil { + if err := <-lsErr; err != nil { yield(DirEntry{}, err) } } From 8ac3d5b8715809ab08e5389ee5d3785e234e28d1 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 25 Nov 2024 13:26:36 -1000 Subject: [PATCH 5/6] fix sharowed variable error --- core/coreapi/pin.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index eb3cdf3fec6..878b4c28d0a 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -352,11 +352,8 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, if emittedSet.Has(c) { return true // skipped } - addErr := AddToResultKeys(c, "", "indirect") - if addErr != nil { - return false - } - return true + addErr = AddToResultKeys(c, "", "indirect") + return addErr == nil }, merkledag.SkipRoot(), merkledag.Concurrent(), ) From 22f399d601f14c93370fca1553943426acbd2528 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 27 Nov 2024 07:37:34 -1000 Subject: [PATCH 6/6] boxo with Ls2 --- core/commands/pin/remotepin.go | 14 ++++++++------ docs/examples/kubo-as-a-library/go.mod | 4 +++- docs/examples/kubo-as-a-library/go.sum | 8 ++++++-- go.mod | 4 +++- go.sum | 8 ++++++-- test/dependencies/go.mod | 2 +- test/dependencies/go.sum | 8 ++++++-- 7 files changed, 33 insertions(+), 15 deletions(-) diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go index fc852fbbfc6..5c9bd5a954c 100644 --- a/core/commands/pin/remotepin.go +++ b/core/commands/pin/remotepin.go @@ -351,12 +351,14 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out c opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...)) } - rmtOut, rmtErr := c.Ls(ctx, opts...) - for p := range rmtOut { - out <- p - } - return <-rmtErr - //return c.Ls2(ctx, out, opts...) + /* + rmtOut, rmtErr := c.Ls(ctx, opts...) + for p := range rmtOut { + out <- p + } + return <-rmtErr + */ + return c.Ls2(ctx, out, opts...) } var rmRemotePinCmd = &cmds.Command{ diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index 3c025040ccf..ee370a16a5b 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -7,7 +7,7 @@ go 1.23 replace github.com/ipfs/kubo => ./../../.. require ( - github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 + github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d github.com/ipfs/kubo v0.0.0-00010101000000-000000000000 github.com/libp2p/go-libp2p v0.37.2 github.com/multiformats/go-multiaddr v0.13.0 @@ -52,6 +52,8 @@ require ( github.com/francoispqt/gojay v1.2.13 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect + github.com/gammazero/chanqueue v1.0.0 // indirect + github.com/gammazero/deque v1.0.0 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-jose/go-jose/v4 v4.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index 110c9bf8e17..4b218eb481a 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -164,6 +164,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= +github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o= +github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -298,8 +302,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c h1:7Uy github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c/go.mod h1:6EekK/jo+TynwSE/ZOiOJd4eEvRXoavEC3vquKtv4yI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw= -github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4= +github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d h1:UgBskn6bIS4Qf2GrRkDDwMRYhMaPiwHuCSW1cp0m62Y= +github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= diff --git a/go.mod b/go.mod index a9bca9d7f33..b377bae5f12 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/hashicorp/go-version v1.7.0 github.com/ipfs-shipyard/nopfs v0.0.12 github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c - github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 + github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d github.com/ipfs/go-block-format v0.2.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-cidutil v0.1.0 @@ -125,6 +125,8 @@ require ( github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect + github.com/gammazero/chanqueue v1.0.0 // indirect + github.com/gammazero/deque v1.0.0 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-jose/go-jose/v4 v4.0.4 // indirect github.com/go-kit/log v0.2.1 // indirect diff --git a/go.sum b/go.sum index c29dbd816bf..4eb0aaf50f8 100644 --- a/go.sum +++ b/go.sum @@ -198,6 +198,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= +github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o= +github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -362,8 +366,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c h1:7Uy github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c/go.mod h1:6EekK/jo+TynwSE/ZOiOJd4eEvRXoavEC3vquKtv4yI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw= -github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4= +github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d h1:UgBskn6bIS4Qf2GrRkDDwMRYhMaPiwHuCSW1cp0m62Y= +github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= diff --git a/test/dependencies/go.mod b/test/dependencies/go.mod index 93ab3f923e0..7cac621284d 100644 --- a/test/dependencies/go.mod +++ b/test/dependencies/go.mod @@ -119,7 +119,7 @@ require ( github.com/huin/goupnp v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect - github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 // indirect + github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d // indirect github.com/ipfs/go-block-format v0.2.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-datastore v0.6.0 // indirect diff --git a/test/dependencies/go.sum b/test/dependencies/go.sum index 11be0c149fd..4609170a790 100644 --- a/test/dependencies/go.sum +++ b/test/dependencies/go.sum @@ -162,6 +162,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fzipp/gocyclo v0.6.0 h1:lsblElZG7d3ALtGMx9fmxeTKZaLLpU8mET09yN4BBLo= github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlyaLkLoA= +github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o= +github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghostiam/protogetter v0.3.6 h1:R7qEWaSgFCsy20yYHNIJsU9ZOb8TziSRRxuAOTVKeOk= github.com/ghostiam/protogetter v0.3.6/go.mod h1:7lpeDnEJ1ZjL/YtyoN99ljO4z0pd3H0d18/t2dPBxHw= @@ -318,8 +322,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw= -github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4= +github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d h1:UgBskn6bIS4Qf2GrRkDDwMRYhMaPiwHuCSW1cp0m62Y= +github.com/ipfs/boxo v0.24.4-0.20241127172419-52a6a06b605d/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g= github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs= github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=