Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Nov 20, 2024
1 parent 7bf7654 commit 89890b4
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 236 deletions.
63 changes: 25 additions & 38 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,59 +62,46 @@ type pinLsObject struct {
Type string
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, <-chan error) {
pins := make(chan iface.Pin)
errOut := make(chan error, 1)
func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts.PinLsOption) error {
defer close(pins)

options, err := caopts.PinLsOptions(opts...)
if err != nil {
errOut <- err
close(pins)
close(errOut)
return pins, errOut
return err
}

res, err := api.core().Request("pin/ls").
Option("type", options.Type).
Option("stream", true).
Send(ctx)
if err != nil {
errOut <- err
close(pins)
close(errOut)
return pins, errOut
return err
}

go func(ch chan<- iface.Pin, errCh chan<- error) {
defer res.Output.Close()
defer close(ch)
defer close(errCh)

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
err := dec.Decode(&out)
if err != nil {
if err != io.EOF {
errCh <- err
}
return
defer res.Output.Close()

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
err := dec.Decode(&out)
if err != nil {
if err != io.EOF {
return err
}
return nil
}

c, err := cid.Parse(out.Cid)
if err != nil {
errCh <- err
return
}
c, err := cid.Parse(out.Cid)
if err != nil {
return err
}

select {
case ch <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return
}
select {
case pins <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return ctx.Err()
}
}(pins, errOut)
return pins, errOut
}
return nil
}

// IsPinned returns whether or not the given cid is pinned
Expand Down
113 changes: 48 additions & 65 deletions client/rpc/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,12 @@ type lsOutput struct {
Objects []lsObject
}

func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, <-chan error) {
out := make(chan iface.DirEntry)
errOut := make(chan error, 1)
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error {
defer close(out)

options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
errOut <- err
close(out)
close(errOut)
return out, errOut
return err
}

resp, err := api.core().Request("ls", p.String()).
Expand All @@ -162,79 +158,66 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
Option("stream", true).
Send(ctx)
if err != nil {
errOut <- err
close(out)
close(errOut)
return out, errOut
return err
}
if resp.Error != nil {
errOut <- resp.Error
close(out)
close(errOut)
return out, errOut
return err
}
defer resp.Close()

dec := json.NewDecoder(resp.Output)

go func() {
defer resp.Close()
defer close(out)
defer close(errOut)

for {
var link lsOutput
if err := dec.Decode(&link); err != nil {
if err != io.EOF {
errOut <- err
}
return
for {
var link lsOutput
if err = dec.Decode(&link); err != nil {
if err != io.EOF {
return err
}
return nil
}

if len(link.Objects) != 1 {
errOut <- errors.New("unexpected Objects len")
return
}
if len(link.Objects) != 1 {
return errors.New("unexpected Objects len")
}

if len(link.Objects[0].Links) != 1 {
errOut <- errors.New("unexpected Links len")
return
}
if len(link.Objects[0].Links) != 1 {
return errors.New("unexpected Links len")
}

l0 := link.Objects[0].Links[0]
l0 := link.Objects[0].Links[0]

c, err := cid.Decode(l0.Hash)
if err != nil {
errOut <- err
return
}
c, err := cid.Decode(l0.Hash)
if err != nil {
return err
}

var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}
var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}

select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
}
select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
return ctx.Err()
}
}()
}

return out, errOut
return nil
}

func (api *UnixfsAPI) core() *HttpApi {
Expand Down
17 changes: 12 additions & 5 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,21 @@ The JSON output contains type information.
}
}

lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

for i, fpath := range paths {
pth, err := cmdutils.PathOrCidPath(fpath)
if err != nil {
return err
}

results, errCh := api.Unixfs().Ls(req.Context, pth,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
results := make(chan iface.DirEntry)
var lsErr error
go func() {
lsErr = api.Unixfs().Ls(lsCtx, pth, results,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
}()

processLink, dirDone = processDir()
for link := range results {
Expand All @@ -164,12 +171,12 @@ The JSON output contains type information.
Mode: link.Mode,
ModTime: link.ModTime,
}
if err := processLink(paths[i], lsLink); err != nil {
if err = processLink(paths[i], lsLink); err != nil {
return err
}
}
if err = <-errCh; err != nil {
return err
if lsErr != nil {
return lsErr
}
dirDone(i)
}
Expand Down
12 changes: 10 additions & 2 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,15 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}

pins, errCh := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
var lsErr error
pins := make(chan iface.Pin)

Check failure on line 561 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / go-check

undefined: iface

Check failure on line 561 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / go-build

undefined: iface

Check failure on line 561 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / interop-prep

undefined: iface

Check failure on line 561 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / go-lint

undefined: iface

Check failure on line 561 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / go-test

undefined: iface
lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

go func() {
lsErr = api.Pin().Ls(lsCtx, opt, pins, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))

Check failure on line 566 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / go-check

cannot use opt (variable of type "github.com/ipfs/kubo/core/coreiface/options".PinLsOption) as chan<- iface.Pin value in argument to api.Pin().Ls

Check failure on line 566 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / go-build

cannot use opt (variable of type "github.com/ipfs/kubo/core/coreiface/options".PinLsOption) as chan<- iface.Pin value in argument to api.Pin().Ls

Check failure on line 566 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / interop-prep

cannot use opt (variable of type "github.com/ipfs/kubo/core/coreiface/options".PinLsOption) as chan<- iface.Pin value in argument to api.Pin().Ls

Check failure on line 566 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / go-lint

cannot use opt (variable of type "github.com/ipfs/kubo/core/coreiface/options".PinLsOption) as chan<- iface.Pin value in argument to api.Pin().Ls) (typecheck)

Check failure on line 566 in core/commands/pin/pin.go

View workflow job for this annotation

GitHub Actions / go-test

cannot use opt (variable of type "github.com/ipfs/kubo/core/coreiface/options".PinLsOption) as chan<- iface.Pin value in argument to api.Pin().Ls
}()

for p := range pins {
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Expand All @@ -570,7 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}
return <-errCh
return lsErr
}

const (
Expand Down
20 changes: 6 additions & 14 deletions core/commands/pin/remotepin.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
}

// Executes GET /pins/?query-with-filters
func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-chan pinclient.PinStatusGetter, <-chan error) {
func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out chan<- pinclient.PinStatusGetter) error {
opts := []pinclient.LsOption{}
if name, nameFound := req.Options[pinNameOptionName]; nameFound {
nameStr := name.(string)
Expand All @@ -326,12 +326,8 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-ch
for _, rawCID := range cidsRawArr {
parsedCID, err := cid.Decode(rawCID)
if err != nil {
psCh := make(chan pinclient.PinStatusGetter)
errCh := make(chan error, 1)
errCh <- fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
close(psCh)
close(errCh)
return psCh, errCh
close(out)
return fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
}
parsedCIDs = append(parsedCIDs, parsedCID)
}
Expand All @@ -343,19 +339,15 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (<-ch
for _, rawStatus := range statusRawArr {
s := pinclient.Status(rawStatus)
if s.String() == string(pinclient.StatusUnknown) {
psCh := make(chan pinclient.PinStatusGetter)
errCh := make(chan error, 1)
errCh <- fmt.Errorf("status %q is not valid", rawStatus)
close(psCh)
close(errCh)
return psCh, errCh
close(out)
return fmt.Errorf("status %q is not valid", rawStatus)
}
parsedStatuses = append(parsedStatuses, s)
}
opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...))
}

return c.Ls(ctx, opts...)
return c.Ls(ctx, out, opts...)

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / go-check

too many return values

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / go-check

too many arguments in call to c.Ls

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / go-build

too many return values

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / go-build

too many arguments in call to c.Ls

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / interop-prep

too many return values

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / interop-prep

too many arguments in call to c.Ls

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / go-lint

too many return values

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / go-lint

too many arguments in call to c.Ls

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / go-test

too many return values

Check failure on line 350 in core/commands/pin/remotepin.go

View workflow job for this annotation

GitHub Actions / go-test

too many arguments in call to c.Ls
}

var rmRemotePinCmd = &cmds.Command{
Expand Down
Loading

0 comments on commit 89890b4

Please sign in to comment.