Skip to content

Commit

Permalink
added directory support
Browse files Browse the repository at this point in the history
  • Loading branch information
rotarur committed Oct 16, 2023
1 parent b91deb2 commit a1b0018
Show file tree
Hide file tree
Showing 3 changed files with 370 additions and 183 deletions.
194 changes: 156 additions & 38 deletions internal/sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -73,15 +74,17 @@ func Sync(cmd *cobra.Command) {
log.Printf("Syncing from %s to %s\n", src, dst)

// Create the API URL for the IPFS pin/ls operation
srcURL := src + utils.IPFS_LIST_ENDPOINT
listURL := fmt.Sprintf("%s%s", src, utils.DIR_LIST_ENDPOINT)

// Get the list of all CID's from the source IPFS
// TODO: implement retry backoff with pester
resL, err := utils.PostIPFS(srcURL, nil)
resL, err := utils.PostCID(listURL, nil, "")
if err != nil {
fmt.Println(err)
}
defer resL.Body.Close()

// Create the slice with the CIDS's
scanner := bufio.NewScanner(resL.Body)
for scanner.Scan() {
var j utils.IPFSCIDResponse
Expand All @@ -93,9 +96,6 @@ func Sync(cmd *cobra.Command) {
}
}

// Create the API URL for the IPFS GET
srcGet := fmt.Sprintf("%s%s", src, utils.IPFS_CAT_ENDPOINT)

counter := 1
length := len(cids)

Expand All @@ -113,11 +113,10 @@ func Sync(cmd *cobra.Command) {
wg.Add(1)
go func(c int, cidID string) {
defer wg.Done()
AsyncPostIPFS(srcGet, dst, cidID, &c, length, &failed, &synced)
AsyncCall(src, dst, cidID, &c, length, &failed, &synced)

}(counter, cids[i].Cid)
counter += 1

i++
}

Expand All @@ -130,56 +129,175 @@ func Sync(cmd *cobra.Command) {
log.Printf("Total time: %s\n", time.Since(timeStart))
}

func AsyncPostIPFS(src string, dst string, cidID string, counter *int, length int, failed *int, synced *int) {
// Get IPFS CID from source
srcCID := src + cidID
log.Printf("%d/%d: Syncing the CID: %s\n", *counter, length, cidID)
func AsyncCall(src string, dst string, cidID string, counter *int, length int, failed *int, synced *int) {
// Create the API URL for the IPFS GET
srcGet := fmt.Sprintf("%s%s%s", src, utils.CAT_ENDPOINT, cidID)

utils.PrintLogMessage(*counter, length, cidID, "Syncing")

// Get CID from source
resG, err := utils.GetIPFS(srcCID, nil)
resG, err := utils.GetCID(srcGet, nil)
if err != nil {
log.Printf("%d/%d: %s; CID: %s", *counter, length, err, cidID)
*failed += 1
*counter += 1
// Check if it's a directory
if strings.Contains(fmt.Sprintf("%s", err), utils.DIR_ERROR) {
err := syncDir(src, dst, cidID, cidID)
if err != nil {
utils.PrintLogMessage(*counter, length, cidID, fmt.Sprintf("%s", err))
*failed += 1
*counter += 1
} else {
utils.PrintLogMessage(*counter, length, cidID, "Successfully synced directory")
}
} else {
utils.PrintLogMessage(*counter, length, cidID, fmt.Sprintf("%s", err))
*failed += 1
*counter += 1
}
return
}
defer resG.Body.Close()

cidV := utils.GetCIDVersion(cidID)
// Create the API URL fo the POST on destination
apiADD := fmt.Sprintf("%s%s?cid-version=%s", dst, utils.IPFS_PIN_ENDPOINT, cidV)

newBody, err := utils.GetHTTPBody(resG)
payload, err := utils.ParseHTTPBody(resG)
if err != nil {
log.Printf("%d/%d: %s", *counter, length, err)
utils.PrintLogMessage(*counter, length, cidID, fmt.Sprintf("%s", err))
}

// Sync IPFS CID into destination
// TODO: implement retry backoff with pester
var m utils.IPFSResponse
resP, err := utils.PostIPFS(apiADD, newBody)
err = syncCall(src, dst, cidID, "", "", payload)
if err != nil {
log.Printf("%d/%d: %s", *counter, length, err)
utils.PrintLogMessage(*counter, length, cidID, fmt.Sprintf("%s", err))
*failed += 1
} else {
defer resP.Body.Close()
}

// Print success message
utils.PrintLogMessage(*counter, length, cidID, "Successfully synced")
*synced += 1
}

// Generic function to parse the response and create a struct
err := utils.UnmarshalToStruct[utils.IPFSResponse](resP.Body, &m)
func syncCall(src, dst, cid, parentCid, filePath string, payload []byte) error {
// We need to get the body if this was a fresh call
if len(payload) == 0 {
// Create the API URL for the IPFS GET
srcGet := fmt.Sprintf("%s%s%s", src, utils.CAT_ENDPOINT, cid)

// Get CID from source
resG, err := utils.GetCID(srcGet, nil)
if err != nil {
return err
}
defer resG.Body.Close()

payload, err = utils.ParseHTTPBody(resG)
if err != nil {
log.Printf("%d/%d: %s", *counter, length, err)
return err
}
}
cidV := utils.GetCIDVersion(cid)

var apiADD string
if len(filePath) != 0 {
// Create the API URL for the directory POST on destination
apiADD = fmt.Sprintf("%s%s?cid-version=%s&wrap-with-directory=1&to-files=1", dst, utils.IPFS_PIN_ENDPOINT, cidV)
} else {
// Create the API URL for the POST on destination
apiADD = fmt.Sprintf("%s%s?cid-version=%s", dst, utils.IPFS_PIN_ENDPOINT, cidV)
}

// Sync IPFS CID into destination
// TODO: implement retry backoff with pester
// log.Printf(filePath)
resP, err := utils.PostCID(apiADD, payload, filePath)
if err != nil {
return err
}
defer resP.Body.Close()

// Generic function to parse the response and create a struct
var m []utils.IPFSResponse
err = utils.UnmarshalIPFSResponse(resP.Body, &m)
if err != nil {
return err
}

// Check if the IPFS Hash is the same as the source one
// If not the syncing didn't work
ok, err := utils.TestIPFSHash(cidID, m.Hash)
ok := false
for _, v := range m {
if len(parentCid) != 0 {
if v.Hash == parentCid {
ok = true
break
}
} else {
if v.Hash == cid {
ok = true
}
}

}

if !ok {
return fmt.Errorf("Can't be synced. The source and destination IPFS Hash differ")
}

return nil
}

func syncDir(src, dst, file, parentCid string) error {
listURL := fmt.Sprintf("%s%s%s", src, utils.DIR_LIST_ENDPOINT, file)

// List directory
lsD, err := utils.GetCID(listURL, nil)
if err != nil {
log.Printf("%d/%d: %s", *counter, length, err)
*failed += 1
} else {
// Print success message
log.Printf("%d/%d: %s", *counter, length, ok)
*synced += 1
return err
}
defer lsD.Body.Close()

// Create the structure with the CID directory
var data utils.Data
err = utils.UnmarshalToStruct[utils.Data](lsD.Body, &data)
if err != nil {
return err
}

// Recursive function to sync all directory content
for _, v := range data.Objects {
err = syncDirContent(src, dst, parentCid, v, true)
if err != nil {
return err
}
}

return nil
}

func syncDirContent(src, dst, parentCID string, data utils.Object, s bool) error {
for _, v := range data.Links {
// Syntax: https://ipfs.com/ipfs/api/v0/cat?arg=QmcoBTSpxyBx2AuUqhuy5X1UrasbLoz76QFGLgqUqhXLK6/foo.txt
filePath := fmt.Sprintf("%s/%s", data.Hash, v.Name)
url := fmt.Sprintf("%s%s%s", src, utils.CAT_ENDPOINT, filePath)

_, err := utils.GetCID(url, nil)
if err != nil {
// Check if it's a directory
// If true, the new source will be like: https://ipfs.com/ipfs/api/v0/cat?arg=QmcoBTSpxyBx2AuUqhuy5X1UrasbLoz76QFGLgqUqhXLK6/FOO
if strings.Contains(fmt.Sprintf("%s", err), utils.DIR_ERROR) {
// The new CID for directory will be like: QmcoBTSpxyBx2AuUqhuy5X1UrasbLoz76QFGLgqUqhXLK6/FOO
filePath := fmt.Sprintf("%s/%s", data.Hash, v.Name)
err := syncDir(src, dst, filePath, v.Hash)
if err != nil {
return err
}
} else {
return err
}
} else {
err = syncCall(src, dst, v.Hash, parentCID, filePath, []byte{})
if err != nil {
return err
}
}

}

return nil
}
51 changes: 36 additions & 15 deletions internal/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,54 @@ package utils

import "net/http"

var IPFS_LIST_ENDPOINT string = "/ipfs/api/v0/pin/ls?stream=true"
var IPFS_CAT_ENDPOINT string = "/ipfs/api/v0/cat?arg="
// var IPFS_PIN_ENDPOINT string = "/ipfs/api/v0/add?stream-channels=true"
var DIR_LIST_ENDPOINT string = "/ipfs/api/v0/ls?arg="
var CAT_ENDPOINT string = "/ipfs/api/v0/cat?arg="
var IPFS_PIN_ENDPOINT string = "/ipfs/api/v0/add"
var HEADER_APP_JSON string = "application/json"

var IPFS_DIR_ERROR = "this dag node is a directory"
var DIR_ERROR = "this dag node is a directory"

type Link struct {
Name string `json:"Name"`
Hash string `json:"Hash"`
Size int `json:"Size"`
Type int `json:"Type"`
Target string `json:"Target"`
}

type Object struct {
Hash string `json:"Hash"`
Links []Link `json:"Links"`
}

type Data struct {
Objects []Object `json:"Objects"`
}

type HTTPResult struct {
HTTPResponse *http.Response `json:"http_response"`
Error error `json:"error"`
Counter int `json:"counter" default:"0"`
HTTPResponse *http.Response `json:"http_response"`
Error error `json:"error"`
Counter int `json:"counter" default:"0"`
}

type IPFSCIDResponse struct {
Cid string `json:"cid"`
Type string `json:"type"`
Cid string `json:"cid"`
Type string `json:"type"`
}

type IPFSResponse struct {
Name string `json:"name"`
Hash string `json:"hash"`
Size string `json:"size"`
Name string `json:"name"`
Hash string `json:"hash"`
Size string `json:"size"`
}

type IPFSErrorResponse struct {
Message string `json:"message"`
Code int `json:"code"`
Type string `json:"type"`
Message string `json:"message"`
Code int `json:"code"`
Type string `json:"type"`
}

type Header struct {
Key string `json:"key"`
Value string `json:"value"`
}
Loading

0 comments on commit a1b0018

Please sign in to comment.