Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sort car urls #172

Merged
merged 12 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 147 additions & 12 deletions cmd-car-split.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -44,7 +45,8 @@ var (
Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder
Version: 1,
}
hdrSize, _ = car.HeaderSize(hdr)
hdrSize, _ = car.HeaderSize(hdr)
maxSectionSize = 2 << 20 // 2 MiB
)

const maxLinks = 432000 / 18 // 18 subsets
Expand Down Expand Up @@ -532,11 +534,13 @@ func readHeader(streamBuf *bufio.Reader) ([]byte, int64, error) {
return headerBuf.Bytes(), streamLen, nil
}

func SortCarFiles(carFiles []string) ([]string, error) {
type carFileInfo struct {
path string
firstSlot int64
}
type carFileInfo struct {
name string
firstSlot int64
size int64
}

func SortCarFiles(carFiles []string) ([]carFileInfo, error) {

var fileInfos []carFileInfo

Expand Down Expand Up @@ -584,9 +588,15 @@ func SortCarFiles(carFiles []string) ([]string, error) {
return nil, fmt.Errorf("failed to find root node in file %s", path)
}

fi, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("failed to get file info for %s: %w", path, err)
}

fileInfos = append(fileInfos, carFileInfo{
path: path,
name: path,
firstSlot: int64(subset.First),
size: fi.Size(),
})
}

Expand All @@ -595,11 +605,136 @@ func SortCarFiles(carFiles []string) ([]string, error) {
return fileInfos[i].firstSlot < fileInfos[j].firstSlot
})

// Extract the sorted file paths
sortedFiles := make([]string, len(fileInfos))
for i, info := range fileInfos {
sortedFiles[i] = info.path
return fileInfos, nil
}

func SortCarURLs(carURLs []string) ([]carFileInfo, error) {
var urlInfos []carFileInfo

for _, url := range carURLs {
firstSlot, size, err := getSlotAndSizeFromURL(url)
if err != nil {
return nil, fmt.Errorf("failed to get first slot from URL %s: %w", url, err)
}

urlInfos = append(urlInfos, carFileInfo{
name: url,
firstSlot: firstSlot,
size: size,
})
}

// Sort the URL infos based on the firstSlot
sort.Slice(urlInfos, func(i, j int) bool {
return urlInfos[i].firstSlot < urlInfos[j].firstSlot
})

return urlInfos, nil
}

func getSlotAndSizeFromURL(url string) (int64, int64, error) {
fileSize, err := splitcarfetcher.GetContentSizeWithHeadOrZeroRange(url)
if err != nil {
return 0, 0, fmt.Errorf("failed to get file size: %w", err)
}

rootCID, err := getRootCid(url)
if err != nil {
return 0, 0, fmt.Errorf("failed to get root CID: %w", err)
}

endOffset := getEndOffset(fileSize)

partialContent, err := fetchFromOffset(url, endOffset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to check error


cidBytes := rootCID.Bytes()
index := bytes.LastIndex(partialContent, cidBytes)
if index == -1 {
return 0, 0, fmt.Errorf("CID block not found in the last 2MiB of the file")
}
blockData := partialContent[index-2:]
r := bufio.NewReader(bytes.NewBuffer(blockData))
cid, _, data, err := carreader.ReadNodeInfoWithData(r)
if err != nil {
return 0, 0, fmt.Errorf("failed to read node info: %w", err)
}
if cid != rootCID {
return 0, 0, fmt.Errorf("expected CID %s, got %s", rootCID, cid)
}

// Decode the Subset
subset, err := iplddecoders.DecodeSubset(data)
if err != nil {
return 0, 0, fmt.Errorf("failed to decode Subset from block: %w", err)
}

return int64(subset.First), fileSize, nil
}

func getRootCid(url string) (cid.Cid, error) {
// Make a GET request for the beginning of the file
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return cid.Undef, fmt.Errorf("failed to create request: %w", err)
}

// Request only the first hdrSize bytes
req.Header.Set("Range", fmt.Sprintf("bytes=0-%d", hdrSize))

resp, err := http.DefaultClient.Do(req)
if err != nil {
return cid.Undef, fmt.Errorf("failed to fetch CAR file header: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusPartialContent {
return cid.Undef, fmt.Errorf("server does not support range requests")
}

// Read the header content
headerContent, err := io.ReadAll(resp.Body)
if err != nil {
return cid.Undef, fmt.Errorf("failed to read header content: %w", err)
}

// Parse the CAR header
rc := io.NopCloser(bytes.NewReader(headerContent))
cr, err := carreader.New(rc)
if err != nil {
return cid.Undef, fmt.Errorf("failed to create CarReader: %w", err)
}

roots := cr.Header.Roots
if len(roots) != 1 {
return cid.Undef, fmt.Errorf("expected 1 root CID, got %d", len(roots))
}
rootCID := roots[0]

return rootCID, nil
}

func getEndOffset(fileSize int64) int64 {
eo := fileSize - int64(maxSectionSize)
return max(eo, 0)
}

func fetchFromOffset(url string, offset int64) ([]byte, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch CAR file: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusPartialContent {
return nil, fmt.Errorf("server does not support range requests")
}

return sortedFiles, nil
return io.ReadAll(resp.Body)
}
Loading
Loading