diff --git a/internal/sync/main.go b/internal/sync/main.go index 51cbd01..75095ae 100644 --- a/internal/sync/main.go +++ b/internal/sync/main.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "strings" "sync" "time" @@ -73,15 +74,17 @@ func Sync(cmd *cobra.Command) { log.Printf("Syncing from %s to %s\n", src, dst) // Create the API URL for the IPFS pin/ls operation - srcURL := src + utils.IPFS_LIST_ENDPOINT + listURL := fmt.Sprintf("%s%s", src, utils.DIR_LIST_ENDPOINT) // Get the list of all CID's from the source IPFS // TODO: implement retry backoff with pester - resL, err := utils.PostIPFS(srcURL, nil) + resL, err := utils.PostCID(listURL, nil, "") if err != nil { fmt.Println(err) } + defer resL.Body.Close() + // Create the slice with the CIDS's scanner := bufio.NewScanner(resL.Body) for scanner.Scan() { var j utils.IPFSCIDResponse @@ -93,9 +96,6 @@ func Sync(cmd *cobra.Command) { } } - // Create the API URL for the IPFS GET - srcGet := fmt.Sprintf("%s%s", src, utils.IPFS_CAT_ENDPOINT) - counter := 1 length := len(cids) @@ -113,11 +113,10 @@ func Sync(cmd *cobra.Command) { wg.Add(1) go func(c int, cidID string) { defer wg.Done() - AsyncPostIPFS(srcGet, dst, cidID, &c, length, &failed, &synced) + AsyncCall(src, dst, cidID, &c, length, &failed, &synced) }(counter, cids[i].Cid) counter += 1 - i++ } @@ -130,56 +129,175 @@ func Sync(cmd *cobra.Command) { log.Printf("Total time: %s\n", time.Since(timeStart)) } -func AsyncPostIPFS(src string, dst string, cidID string, counter *int, length int, failed *int, synced *int) { - // Get IPFS CID from source - srcCID := src + cidID - log.Printf("%d/%d: Syncing the CID: %s\n", *counter, length, cidID) +func AsyncCall(src string, dst string, cidID string, counter *int, length int, failed *int, synced *int) { + // Create the API URL for the IPFS GET + srcGet := fmt.Sprintf("%s%s%s", src, utils.CAT_ENDPOINT, cidID) + + utils.PrintLogMessage(*counter, length, cidID, "Syncing") // Get CID from source - resG, err := utils.GetIPFS(srcCID, nil) + resG, err := utils.GetCID(srcGet, nil) if err != nil { - log.Printf("%d/%d: %s; CID: %s", *counter, length, err, cidID) - *failed += 1 - *counter += 1 + // Check if it's a directory + if strings.Contains(fmt.Sprintf("%s", err), utils.DIR_ERROR) { + err := syncDir(src, dst, cidID, cidID) + if err != nil { + utils.PrintLogMessage(*counter, length, cidID, fmt.Sprintf("%s", err)) + *failed += 1 + *counter += 1 + } else { + utils.PrintLogMessage(*counter, length, cidID, "Successfully synced directory") + } + } else { + utils.PrintLogMessage(*counter, length, cidID, fmt.Sprintf("%s", err)) + *failed += 1 + *counter += 1 + } return } defer resG.Body.Close() - cidV := utils.GetCIDVersion(cidID) - // Create the API URL fo the POST on destination - apiADD := fmt.Sprintf("%s%s?cid-version=%s", dst, utils.IPFS_PIN_ENDPOINT, cidV) - - newBody, err := utils.GetHTTPBody(resG) + payload, err := utils.ParseHTTPBody(resG) if err != nil { - log.Printf("%d/%d: %s", *counter, length, err) + utils.PrintLogMessage(*counter, length, cidID, fmt.Sprintf("%s", err)) } - // Sync IPFS CID into destination - // TODO: implement retry backoff with pester - var m utils.IPFSResponse - resP, err := utils.PostIPFS(apiADD, newBody) + err = syncCall(src, dst, cidID, "", "", payload) if err != nil { - log.Printf("%d/%d: %s", *counter, length, err) + utils.PrintLogMessage(*counter, length, cidID, fmt.Sprintf("%s", err)) *failed += 1 - } else { - defer resP.Body.Close() + } + + // Print success message + utils.PrintLogMessage(*counter, length, cidID, "Successfully synced") + *synced += 1 +} - // Generic function to parse the response and create a struct - err := utils.UnmarshalToStruct[utils.IPFSResponse](resP.Body, &m) +func syncCall(src, dst, cid, parentCid, filePath string, payload []byte) error { + // We need to get the body if this was a fresh call + if len(payload) == 0 { + // Create the API URL for the IPFS GET + srcGet := fmt.Sprintf("%s%s%s", src, utils.CAT_ENDPOINT, cid) + + // Get CID from source + resG, err := utils.GetCID(srcGet, nil) + if err != nil { + return err + } + defer resG.Body.Close() + + payload, err = utils.ParseHTTPBody(resG) if err != nil { - log.Printf("%d/%d: %s", *counter, length, err) + return err } } + cidV := utils.GetCIDVersion(cid) + + var apiADD string + if len(filePath) != 0 { + // Create the API URL for the directory POST on destination + apiADD = fmt.Sprintf("%s%s?cid-version=%s&wrap-with-directory=1&to-files=1", dst, utils.IPFS_PIN_ENDPOINT, cidV) + } else { + // Create the API URL for the POST on destination + apiADD = fmt.Sprintf("%s%s?cid-version=%s", dst, utils.IPFS_PIN_ENDPOINT, cidV) + } + + // Sync IPFS CID into destination + // TODO: implement retry backoff with pester + // log.Printf(filePath) + resP, err := utils.PostCID(apiADD, payload, filePath) + if err != nil { + return err + } + defer resP.Body.Close() + + // Generic function to parse the response and create a struct + var m []utils.IPFSResponse + err = utils.UnmarshalIPFSResponse(resP.Body, &m) + if err != nil { + return err + } // Check if the IPFS Hash is the same as the source one // If not the syncing didn't work - ok, err := utils.TestIPFSHash(cidID, m.Hash) + ok := false + for _, v := range m { + if len(parentCid) != 0 { + if v.Hash == parentCid { + ok = true + break + } + } else { + if v.Hash == cid { + ok = true + } + } + + } + + if !ok { + return fmt.Errorf("Can't be synced. The source and destination IPFS Hash differ") + } + + return nil +} + +func syncDir(src, dst, file, parentCid string) error { + listURL := fmt.Sprintf("%s%s%s", src, utils.DIR_LIST_ENDPOINT, file) + + // List directory + lsD, err := utils.GetCID(listURL, nil) if err != nil { - log.Printf("%d/%d: %s", *counter, length, err) - *failed += 1 - } else { - // Print success message - log.Printf("%d/%d: %s", *counter, length, ok) - *synced += 1 + return err } + defer lsD.Body.Close() + + // Create the structure with the CID directory + var data utils.Data + err = utils.UnmarshalToStruct[utils.Data](lsD.Body, &data) + if err != nil { + return err + } + + // Recursive function to sync all directory content + for _, v := range data.Objects { + err = syncDirContent(src, dst, parentCid, v, true) + if err != nil { + return err + } + } + + return nil +} + +func syncDirContent(src, dst, parentCID string, data utils.Object, s bool) error { + for _, v := range data.Links { + // Syntax: https://ipfs.com/ipfs/api/v0/cat?arg=QmcoBTSpxyBx2AuUqhuy5X1UrasbLoz76QFGLgqUqhXLK6/foo.txt + filePath := fmt.Sprintf("%s/%s", data.Hash, v.Name) + url := fmt.Sprintf("%s%s%s", src, utils.CAT_ENDPOINT, filePath) + + _, err := utils.GetCID(url, nil) + if err != nil { + // Check if it's a directory + // If true, the new source will be like: https://ipfs.com/ipfs/api/v0/cat?arg=QmcoBTSpxyBx2AuUqhuy5X1UrasbLoz76QFGLgqUqhXLK6/FOO + if strings.Contains(fmt.Sprintf("%s", err), utils.DIR_ERROR) { + // The new CID for directory will be like: QmcoBTSpxyBx2AuUqhuy5X1UrasbLoz76QFGLgqUqhXLK6/FOO + filePath := fmt.Sprintf("%s/%s", data.Hash, v.Name) + err := syncDir(src, dst, filePath, v.Hash) + if err != nil { + return err + } + } else { + return err + } + } else { + err = syncCall(src, dst, v.Hash, parentCID, filePath, []byte{}) + if err != nil { + return err + } + } + + } + + return nil } diff --git a/internal/utils/types.go b/internal/utils/types.go index ae33d2a..2bf7add 100644 --- a/internal/utils/types.go +++ b/internal/utils/types.go @@ -2,33 +2,54 @@ package utils import "net/http" -var IPFS_LIST_ENDPOINT string = "/ipfs/api/v0/pin/ls?stream=true" -var IPFS_CAT_ENDPOINT string = "/ipfs/api/v0/cat?arg=" -// var IPFS_PIN_ENDPOINT string = "/ipfs/api/v0/add?stream-channels=true" +var DIR_LIST_ENDPOINT string = "/ipfs/api/v0/ls?arg=" +var CAT_ENDPOINT string = "/ipfs/api/v0/cat?arg=" var IPFS_PIN_ENDPOINT string = "/ipfs/api/v0/add" var HEADER_APP_JSON string = "application/json" -var IPFS_DIR_ERROR = "this dag node is a directory" +var DIR_ERROR = "this dag node is a directory" + +type Link struct { + Name string `json:"Name"` + Hash string `json:"Hash"` + Size int `json:"Size"` + Type int `json:"Type"` + Target string `json:"Target"` +} + +type Object struct { + Hash string `json:"Hash"` + Links []Link `json:"Links"` +} + +type Data struct { + Objects []Object `json:"Objects"` +} type HTTPResult struct { - HTTPResponse *http.Response `json:"http_response"` - Error error `json:"error"` - Counter int `json:"counter" default:"0"` + HTTPResponse *http.Response `json:"http_response"` + Error error `json:"error"` + Counter int `json:"counter" default:"0"` } type IPFSCIDResponse struct { - Cid string `json:"cid"` - Type string `json:"type"` + Cid string `json:"cid"` + Type string `json:"type"` } type IPFSResponse struct { - Name string `json:"name"` - Hash string `json:"hash"` - Size string `json:"size"` + Name string `json:"name"` + Hash string `json:"hash"` + Size string `json:"size"` } type IPFSErrorResponse struct { - Message string `json:"message"` - Code int `json:"code"` - Type string `json:"type"` + Message string `json:"message"` + Code int `json:"code"` + Type string `json:"type"` +} + +type Header struct { + Key string `json:"key"` + Value string `json:"value"` } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index fe4c60f..1787c8d 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -7,162 +7,195 @@ import ( "fmt" "io" "io/ioutil" + "log" "mime/multipart" "net/http" + "net/url" "os" - "path/filepath" "strings" "time" ) -func ValidateEndpoints (src string, dst string) error { - if src == dst { - return fmt.Errorf("Error: The specified source <%s> is the same as the destination <%s>", src, dst) - } - return nil -} +func createTempDirWithFile(f []string) (*os.File, error) { + dir := f[:len(f)-1] + // Create the directories for the CID structure + err := os.MkdirAll(strings.Join(dir, "/"), 0755) + if err != nil { + return nil, err + } -// GenerateTempFileName generates a temporary file name. -func GenerateTempFileName(prefix, suffix string) string { - return filepath.Join(os.TempDir(), fmt.Sprintf("%s%d%s", prefix, time.Now().UnixNano(), suffix)) + file, err := os.Create(fmt.Sprintf("%s", strings.Join(f, "/"))) + if err != nil { + return nil, err + } + + return file, nil } -func GetIPFS(url string, payload io.Reader) (*http.Response, error) { - req, err := http.NewRequest(http.MethodGet, url, payload) - if err != nil { - return nil, fmt.Errorf("Error creating HTTP request: %s", err) - } - - // Set custom User-Agent for cloudflare WAF policies - req.Header.Set("User-Agent", "graphprotocol/ipfs-mgm") - - // Create an HTTP client - client := &http.Client{} - - res, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("Error making API request: %s", err) - } - - if s := res.Status; strings.HasPrefix(s, "5") || strings.HasPrefix(s, "4") { - // Check if the error is due to the CID being a directory - var dirIPFS IPFSErrorResponse - _ = UnmarshalToStruct[IPFSErrorResponse](res.Body, &dirIPFS) - if dirIPFS.Message == IPFS_DIR_ERROR { - return nil, fmt.Errorf("Cannot get this IPFS CID. Error message: %s", dirIPFS.Message) - } else { - return nil, fmt.Errorf("There was an error with the request. Error code: HTTP %s", s) - } - } - - return res, nil +func GetCID(url string, payload io.Reader) (*http.Response, error) { + req, err := http.NewRequest(http.MethodGet, url, payload) + if err != nil { + return nil, fmt.Errorf("Error creating HTTP request: %s", err) + } + + // Set custom User-Agent for cloudflare WAF policies + req.Header.Set("User-Agent", "graphprotocol/ipfs-mgm") + + // Create an HTTP client + client := &http.Client{} + + res, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("Error making API request: %s", err) + } + + if s := res.Status; strings.HasPrefix(s, "5") || strings.HasPrefix(s, "4") { + // Check if the error is due to the CID being a directory + var dirIPFS IPFSErrorResponse + _ = UnmarshalToStruct[IPFSErrorResponse](res.Body, &dirIPFS) + if dirIPFS.Message == DIR_ERROR { + return nil, fmt.Errorf("Cannot get this IPFS CID. Error message: %s", dirIPFS.Message) + } else { + return nil, fmt.Errorf("There was an error with the request. Error code: HTTP %s", s) + } + } + + return res, nil } -func PostIPFS(url string, payload []byte) (*http.Response, error) { - // Generate a unique temporary file name - tempFileName := GenerateTempFileName("ipfs-data-", ".tmp") - - // Create a temporary file to store the IPFS object data - tempFile, err := os.Create(tempFileName) - if err != nil { - return nil, fmt.Errorf("Error creating temporary file: %s", err) - } - defer tempFile.Close() - - // Write the IPFS object data to the temporary file - _, err = tempFile.Write(payload) - if err != nil { - return nil, fmt.Errorf("Error writing data to temporary file: %s", err) - } - - // Create a new HTTP POST request to add the file to the destination - body := &bytes.Buffer{} - writer := multipart.NewWriter(body) - filePart, err := writer.CreateFormFile("file", filepath.Base(tempFileName)) - if err != nil { - return nil, fmt.Errorf("Error creating form file: %s", err) - } - - // Reset the temporary file pointer to the beginning - tempFile.Seek(0, 0) - - // Copy the temporary file data into the form file - _, err = io.Copy(filePart, tempFile) - if err != nil { - return nil, fmt.Errorf("Error copying file data: %s", err) - } - - writer.Close() // Close the multipart writer - - req, err := http.NewRequest(http.MethodPost, url, body) - if err != nil { - return nil, fmt.Errorf("There was an error creating the HTTP request: %s", err) - } - - // Set custom User-Agent for cloudflare WAF policies - req.Header.Set("User-Agent", "graphprotocol/ipfs-mgm") - // req.Header.Set("Content-Type", "text/plain") - req.Header.Set("Content-Type", writer.FormDataContentType()) - - // Create an HTTP client - client := &http.Client{} - - res, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("Error making API request: %s", err) - } - - if s := res.Status; strings.HasPrefix(s, "5") || strings.HasPrefix(s, "4") { - return nil, fmt.Errorf("The endpoint responded with: HTTP %s", s) - } - - return res, nil +func PostCID(dst string, payload []byte, fPath string) (*http.Response, error) { + var tempFileName []string + var base string + if len(fPath) != 0 { + tempFileName = strings.Split(fPath, "/") + base = tempFileName[0] + // Fix the nested directories + if len(tempFileName) > 2 { + tempFileName = tempFileName[1:] + base = tempFileName[0] + } + } else { + // Generate a unique temporary file name + base = fmt.Sprintf("%d", time.Now().UnixNano()) + tempFileName = []string{base, "ipfs-data.tmp"} + } + + // Create a temporary file to store the IPFS object data + tempFile, err := createTempDirWithFile(tempFileName) + if err != nil { + return nil, fmt.Errorf("Error creating temporary file: %s", err) + } + defer tempFile.Close() + + // Write the IPFS object data to the temporary file + _, err = tempFile.Write(payload) + if err != nil { + return nil, fmt.Errorf("Error writing data to temporary file: %s", err) + } + + // Create a new HTTP POST request to add the file to the destination + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + filePart, err := writer.CreateFormFile("file", strings.Join(tempFileName, "/")) + if err != nil { + return nil, fmt.Errorf("Error creating form file: %s", err) + } + + // Reset the temporary file pointer to the beginning + tempFile.Seek(0, 0) + + // Copy the temporary file data into the form file + _, err = io.Copy(filePart, tempFile) + if err != nil { + return nil, fmt.Errorf("Error copying file data: %s", err) + } + + writer.Close() // Close the multipart writer + + req, err := http.NewRequest(http.MethodPost, dst, body) + if err != nil { + return nil, fmt.Errorf("There was an error creating the HTTP request: %s", err) + } + + // Set custom User-Agent for cloudflare WAF policies + req.Header.Set("User-Agent", "graphprotocol/ipfs-mgm") + req.Header.Set("Content-Type", writer.FormDataContentType()) + + // Set Directory Headers + if len(fPath) != 0 { + f := strings.Split(fPath, "/") + fileNameParts := strings.Split(f[len(f)-1], ".") + fileName := fileNameParts[0] + _, a, _ := strings.Cut(fPath, "/") + req.Header.Set("Content-Disposition", fmt.Sprintf("form-data; name=\"%s\"; filename=%s", fileName, url.PathEscape(a))) + req.Header.Set("Abspath", fmt.Sprintf("%s", tempFileName)) + } + defer os.RemoveAll(base) + + // Create an HTTP client + client := &http.Client{} + + res, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("Error making API request: %s", err) + } + + if s := res.Status; strings.HasPrefix(s, "5") || strings.HasPrefix(s, "4") { + return nil, fmt.Errorf("The endpoint responded with: HTTP %s", s) + } + + return res, nil } -func GetHTTPBody(h *http.Response) ([]byte, error) { - // Read the body response - body, err := ioutil.ReadAll(h.Body) - if err != nil { - return nil, fmt.Errorf("Error reading response body: %s", err) - } +func ParseHTTPBody(h *http.Response) ([]byte, error) { + // Read the body response + body, err := ioutil.ReadAll(h.Body) + if err != nil { + return nil, fmt.Errorf("Error reading response body: %s", err) + } - return body, nil + return body, nil } func GetCIDVersion(cid string) string { - if strings.HasPrefix(cid, "Qm") { - return "0" - } + if strings.HasPrefix(cid, "Qm") { + return "0" + } - return "1" + return "1" } -func TestIPFSHash(s string, d string) (string, error) { - if s != d { - return "", fmt.Errorf("The source Hash %s is different from the destination hash %s", s, d) - } +func TestIPFSHash(s string, d string) error { + if s != d { + return fmt.Errorf("The source IPFS Hash is different from the destination Hash%s", "") + } + + return nil +} - return fmt.Sprintf("Successfully synced to destination IPFS, CID: %s", s), nil +func PrintLogMessage(c int, l int, cid string, message string) { + log.Printf("%d/%d (%s): %s", c, l, cid, message) } func SliceToCIDSStruct(s []string) ([]IPFSCIDResponse, error) { - var cids []IPFSCIDResponse - - for _, k := range s{ - var cid IPFSCIDResponse - // create the structure to be unmarshaled from our string - a := fmt.Sprintf(`{"cid":"%s"}`, k) - err := json.Unmarshal([]byte(a), &cid) - if err != nil { - return nil, fmt.Errorf("Error unmarshaling from slice to IPFS Struct: %s", err) - } - cids = append(cids, cid) - } - return cids, nil + var cids []IPFSCIDResponse + + for _, k := range s { + var cid IPFSCIDResponse + // create the structure to be unmarshaled from our string + a := fmt.Sprintf(`{"cid":"%s"}`, k) + err := json.Unmarshal([]byte(a), &cid) + if err != nil { + return nil, fmt.Errorf("Error unmarshaling from slice to IPFS Struct: %s", err) + } + cids = append(cids, cid) + } + return cids, nil } -func UnmarshalToStruct[V IPFSResponse | IPFSCIDResponse | IPFSErrorResponse](h io.ReadCloser, m *V) error { +func UnmarshalToStruct[V Data | IPFSResponse | IPFSCIDResponse | IPFSErrorResponse](h io.ReadCloser, m *V) error { scanner := bufio.NewScanner(h) + for scanner.Scan() { err := json.Unmarshal(scanner.Bytes(), &m) if err != nil { @@ -173,6 +206,22 @@ func UnmarshalToStruct[V IPFSResponse | IPFSCIDResponse | IPFSErrorResponse](h i return nil } +func UnmarshalIPFSResponse(h io.ReadCloser, m *[]IPFSResponse) error { + scanner := bufio.NewScanner(h) + + for scanner.Scan() { + var rm IPFSResponse + err := json.Unmarshal(scanner.Bytes(), &rm) + if err != nil { + return err + } + + *m = append(*m, rm) + } + + return nil +} + func ReadCIDFromFile(f string) ([]string, error) { file, err := os.Open(f) if err != nil { @@ -186,6 +235,5 @@ func ReadCIDFromFile(f string) ([]string, error) { s = append(s, scanner.Text()) } - return s, nil + return s, nil } -