Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: return errors using channels and not embedded in result type #10527

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 26 additions & 39 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,59 +62,46 @@ type pinLsObject struct {
Type string
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) {
func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts.PinLsOption) error {
defer close(pins)

options, err := caopts.PinLsOptions(opts...)
if err != nil {
return nil, err
return err
}

res, err := api.core().Request("pin/ls").
Option("type", options.Type).
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
return err
}

pins := make(chan iface.Pin)
go func(ch chan<- iface.Pin) {
defer res.Output.Close()
defer close(ch)

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
switch err := dec.Decode(&out); err {
case nil:
case io.EOF:
return
default:
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
defer res.Output.Close()

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
err := dec.Decode(&out)
if err != nil {
if err != io.EOF {
return err
}
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
break
return nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit. No need to scroll down to the end of the loop to make sure we are fully done.

}

c, err := cid.Parse(out.Cid)
if err != nil {
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
}
c, err := cid.Parse(out.Cid)
if err != nil {
return err
}

select {
case ch <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return
}
select {
case pins <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return ctx.Err()
}
}(pins)
return pins, nil
}
return nil
}

// IsPinned returns whether or not the given cid is pinned
Expand Down
116 changes: 49 additions & 67 deletions client/rpc/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ type lsOutput struct {
Objects []lsObject
}

func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) {
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error {
defer close(out)

options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
return nil, err
return err
}

resp, err := api.core().Request("ls", p.String()).
Expand All @@ -156,86 +158,66 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
return err
}
if resp.Error != nil {
return nil, resp.Error
return err
}
defer resp.Close()

dec := json.NewDecoder(resp.Output)
out := make(chan iface.DirEntry)

go func() {
defer resp.Close()
defer close(out)

for {
var link lsOutput
if err := dec.Decode(&link); err != nil {
if err == io.EOF {
return
}
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
return
for {
var link lsOutput
if err = dec.Decode(&link); err != nil {
if err != io.EOF {
return err
}
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idem

}

if len(link.Objects) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}:
case <-ctx.Done():
}
return
}
if len(link.Objects) != 1 {
return errors.New("unexpected Objects len")
}

if len(link.Objects[0].Links) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}:
case <-ctx.Done():
}
return
}
if len(link.Objects[0].Links) != 1 {
return errors.New("unexpected Links len")
}

l0 := link.Objects[0].Links[0]
l0 := link.Objects[0].Links[0]

c, err := cid.Decode(l0.Hash)
if err != nil {
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
return
}
c, err := cid.Decode(l0.Hash)
if err != nil {
return err
}

var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}
var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}

select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
}
select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
return ctx.Err()
}
}()
}

return out, nil
return nil
}

func (api *UnixfsAPI) core() *HttpApi {
Expand Down
23 changes: 14 additions & 9 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -133,23 +134,24 @@ The JSON output contains type information.
}
}

lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

for i, fpath := range paths {
pth, err := cmdutils.PathOrCidPath(fpath)
if err != nil {
return err
}

results, err := api.Unixfs().Ls(req.Context, pth,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
if err != nil {
return err
}
results := make(chan iface.DirEntry)
lsErr := make(chan error, 1)
go func() {
lsErr <- api.Unixfs().Ls(lsCtx, pth, results,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
}()

processLink, dirDone = processDir()
for link := range results {
if link.Err != nil {
return link.Err
}
var ftype unixfs_pb.Data_DataType
switch link.Type {
case iface.TFile:
Expand All @@ -170,10 +172,13 @@ The JSON output contains type information.
Mode: link.Mode,
ModTime: link.ModTime,
}
if err := processLink(paths[i], lsLink); err != nil {
if err = processLink(paths[i], lsLink); err != nil {
return err
}
}
if err = <-lsErr; err != nil {
return err
}
dirDone(i)
}
return done()
Expand Down
18 changes: 9 additions & 9 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,15 +557,16 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}

pins, err := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
if err != nil {
return err
}
pins := make(chan coreiface.Pin)
lsErr := make(chan error, 1)
lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

go func() {
lsErr <- api.Pin().Ls(lsCtx, pins, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
}()

for p := range pins {
if err := p.Err(); err != nil {
return err
}
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Expand All @@ -577,8 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}

return nil
return <-lsErr
}

const (
Expand Down
Loading
Loading