Skip to content

Commit

Permalink
pdp: Support arbitrary hash funcs in uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Oct 24, 2024
1 parent bb53468 commit fe06e34
Show file tree
Hide file tree
Showing 10 changed files with 443 additions and 114 deletions.
210 changes: 169 additions & 41 deletions cmd/pdptool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/x509"
"encoding/hex"
"encoding/json"
"encoding/pem"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"os"
"strconv"
Expand All @@ -18,6 +20,7 @@ import (

"github.com/fatih/color"
"github.com/golang-jwt/jwt/v4"
"github.com/minio/sha256-simd"
"github.com/urfave/cli/v2"

commcid "github.com/filecoin-project/go-fil-commcid"
Expand Down Expand Up @@ -295,8 +298,24 @@ var piecePrepareCmd = &cli.Command{
return fmt.Errorf("failed to compute piece CID: %v", err)
}

// now compute sha256
if _, err := file.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek file: %v", err)
}

// Create commp calculator
h := sha256.New()
_, err = io.Copy(h, file)
if err != nil {
return fmt.Errorf("failed to read input file: %v", err)
}

// Finalize digest
shadigest := h.Sum(nil)

// Output the piece CID and size
fmt.Printf("Piece CID: %s\n", pieceCIDComputed)
fmt.Printf("SHA256: %x\n", shadigest)
fmt.Printf("Padded Piece Size: %d bytes\n", paddedPieceSize)
fmt.Printf("Raw Piece Size: %d bytes\n", pieceSize)

Expand Down Expand Up @@ -327,6 +346,15 @@ var pieceUploadCmd = &cli.Command{
Usage: "Notification URL",
Required: false,
},
&cli.StringFlag{
Name: "hash-type",
Usage: "Hash type to use for verification (sha256 or commp)",
Value: "sha256",
},
&cli.BoolFlag{
Name: "local-notif-wait",
Usage: "Wait for server notification by spawning a temporary local HTTP server",
},
},
Action: func(cctx *cli.Context) error {
inputFile := cctx.Args().Get(0)
Expand All @@ -338,6 +366,8 @@ var pieceUploadCmd = &cli.Command{
jwtToken := cctx.String("jwt-token")
notifyURL := cctx.String("notify-url")
serviceName := cctx.String("service-name")
hashType := cctx.String("hash-type")
localNotifWait := cctx.Bool("local-notif-wait")

if jwtToken == "" {
if serviceName == "" {
Expand All @@ -354,37 +384,120 @@ var pieceUploadCmd = &cli.Command{
}
}

// First, compute the PieceCID
if hashType != "sha256" && hashType != "commp" {
return fmt.Errorf("invalid hash type: %s", hashType)
}

if localNotifWait && notifyURL != "" {
return fmt.Errorf("cannot specify both --notify-url and --local-notif-wait")
}

var notifyReceived chan struct{}
var server *http.Server
var ln net.Listener

if localNotifWait {
notifyReceived = make(chan struct{})
var err error
ln, err = net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return fmt.Errorf("failed to start local HTTP server: %v", err)
}
serverAddr := fmt.Sprintf("http://%s/notify", ln.Addr().String())
notifyURL = serverAddr

mux := http.NewServeMux()
mux.HandleFunc("/notify", func(w http.ResponseWriter, r *http.Request) {
fmt.Println("Received notification from server.")
b, err := io.ReadAll(r.Body)
if err != nil {
fmt.Printf("Failed to read notification body: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
fmt.Printf("Notification body: %s\n", string(b))
w.WriteHeader(http.StatusOK)
// Signal that notification was received
close(notifyReceived)
})

server = &http.Server{Handler: mux}

go func() {
if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
fmt.Printf("HTTP server error: %v\n", err)
}
}()

defer func() {
server.Close()
ln.Close()
}()
}

// Open input file
file, err := os.Open(inputFile)
if err != nil {
return fmt.Errorf("failed to open input file: %v", err)
}
defer file.Close()

cp := &commp.Calc{}
// Get the piece size
fi, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat input file: %v", err)
}
pieceSize := fi.Size()

// Copy data into commp calculator
// Compute CommP (PieceCID)
cp := &commp.Calc{}
_, err = io.Copy(cp, file)
if err != nil {
return fmt.Errorf("failed to read input file: %v", err)
}

// Finalize digest
digest, _, err := cp.Digest()
commpDigest, _, err := cp.Digest()
if err != nil {
return fmt.Errorf("failed to compute digest: %v", err)
}

// Convert digest to CID
pieceCIDComputed, err := commcid.DataCommitmentV1ToCID(digest)
// Compute SHA256
if _, err := file.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek file: %v", err)
}

h := sha256.New()
_, err = io.Copy(h, file)
if err != nil {
return fmt.Errorf("failed to compute piece CID: %v", err)
return fmt.Errorf("failed to read input file: %v", err)
}

shadigest := h.Sum(nil)

// Prepare the check data
var checkData map[string]interface{}

switch hashType {
case "sha256":
checkData = map[string]interface{}{
"name": "sha2-256",
"hash": hex.EncodeToString(shadigest),
"size": pieceSize,
}
case "commp":
hashHex := hex.EncodeToString(commpDigest)
checkData = map[string]interface{}{
"name": "sha2-256-trunc254-padded",
"hash": hashHex,
"size": pieceSize,
}
default:
return fmt.Errorf("unsupported hash type: %s", hashType)
}

// Send POST /pdp/piece to the PDP service
// Prepare the request data
reqData := map[string]interface{}{
"pieceCid": pieceCIDComputed.String(),
"check": checkData,
}
if notifyURL != "" {
reqData["notify"] = notifyURL
Expand All @@ -409,44 +522,59 @@ var pieceUploadCmd = &cli.Command{
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusNoContent {
fmt.Println("Piece already exists on the server.")
if resp.StatusCode == http.StatusOK {
// Piece already exists, get the pieceCID from the response
var respData map[string]string
err = json.NewDecoder(resp.Body).Decode(&respData)
if err != nil {
return fmt.Errorf("failed to parse response: %v", err)
}
pieceCID := respData["pieceCID"]
fmt.Printf("Piece already exists on the server. Piece CID: %s\n", pieceCID)
return nil
} else if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("server returned status code %d: %s", resp.StatusCode, string(body))
}
} else if resp.StatusCode == http.StatusCreated {
// Get the upload URL from the Location header
uploadURL := resp.Header.Get("Location")
if uploadURL == "" {
return fmt.Errorf("server did not provide upload URL in Location header")
}

// Get the upload URL from the Location header
uploadURL := resp.Header.Get("Location")
if uploadURL == "" {
return fmt.Errorf("server did not provide upload URL in Location header")
}
// Upload the piece data via PUT
if _, err := file.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek file: %v", err)
}
uploadReq, err := http.NewRequest("PUT", serviceURL+uploadURL, file)
if err != nil {
return fmt.Errorf("failed to create upload request: %v", err)
}
// Set the Content-Length header
uploadReq.ContentLength = pieceSize
// Set the Content-Type header
uploadReq.Header.Set("Content-Type", "application/octet-stream")

// Upload the piece data via PUT
_, err = file.Seek(0, io.SeekStart) // Reset file pointer to the beginning
if err != nil {
return fmt.Errorf("failed to seek file: %v", err)
}
uploadReq, err := http.NewRequest("PUT", serviceURL+uploadURL, file)
if err != nil {
return fmt.Errorf("failed to create upload request: %v", err)
}
uploadResp, err := client.Do(uploadReq)
if err != nil {
return fmt.Errorf("failed to upload piece data: %v", err)
}
defer uploadResp.Body.Close()

uploadResp, err := client.Do(uploadReq)
if err != nil {
return fmt.Errorf("failed to upload piece data: %v", err)
}
defer uploadResp.Body.Close()
if uploadResp.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(uploadResp.Body)
return fmt.Errorf("upload failed with status code %d: %s", uploadResp.StatusCode, string(body))
}

if uploadResp.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(uploadResp.Body)
return fmt.Errorf("upload failed with status code %d: %s", uploadResp.StatusCode, string(body))
}
fmt.Println("Piece uploaded successfully.")

fmt.Println("Piece uploaded successfully.")
if localNotifWait {
fmt.Println("Waiting for server notification...")
<-notifyReceived
}

return nil
return nil
} else {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("server returned status code %d: %s", resp.StatusCode, string(body))
}
},
}

Expand Down
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1048,9 +1048,6 @@ github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU=
github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU=
github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A=
Expand Down Expand Up @@ -1758,8 +1755,6 @@ golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
13 changes: 12 additions & 1 deletion harmony/harmonydb/sql/20240930-pdp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ CREATE TABLE pdp_piece_uploads (
id UUID PRIMARY KEY NOT NULL,
service TEXT NOT NULL, -- pdp_services.id

piece_cid TEXT NOT NULL, -- piece cid v2
check_hash_codec TEXT NOT NULL, -- hash multicodec used for checking the piece
check_hash BYTEA NOT NULL, -- hash of the piece
check_size BIGINT NOT NULL, -- size of the piece

piece_cid TEXT, -- piece cid v2
notify_url TEXT NOT NULL, -- URL to notify when piece is ready

notify_task_id BIGINT, -- harmonytask task ID, moves to pdp_piecerefs and calls notify_url when piece is ready
Expand Down Expand Up @@ -54,6 +58,13 @@ CREATE TABLE pdp_piecerefs (
FOREIGN KEY (piece_ref) REFERENCES parked_piece_refs(ref_id) ON DELETE CASCADE
);

-- PDP hash to piece cid mapping
CREATE TABLE pdp_piece_mh_to_commp (
mhash BYTEA PRIMARY KEY,
size BIGINT NOT NULL,
commp TEXT NOT NULL
);

CREATE INDEX pdp_piecerefs_piece_cid_idx ON pdp_piecerefs(piece_cid);

-- PDP proofsets we maintain
Expand Down
22 changes: 22 additions & 0 deletions harmony/taskhelp/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package taskhelp

import (
"math"
"time"
)

type RetryWaitFunc func(retries int) time.Duration

// RetryWaitLinear returns a function that calculates a linearly increasing duration based on retries.
func RetryWaitLinear(initial, increment time.Duration) RetryWaitFunc {
return func(retries int) time.Duration {
return initial + time.Duration(retries)*increment
}
}

// RetryWaitExp returns a function that calculates an exponentially increasing duration based on retries.
func RetryWaitExp(initial time.Duration, factor float64) RetryWaitFunc {
return func(retries int) time.Duration {
return time.Duration(float64(initial) * math.Pow(factor, float64(retries)))
}
}
6 changes: 4 additions & 2 deletions pdp/contract/addresses.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package contract

import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/filecoin-project/lotus/chain/types"
"github.com/snadrus/must"
"math/big"

"github.com/filecoin-project/curio/build"

"github.com/filecoin-project/lotus/chain/types"
)

type PDPContracts struct {
Expand Down
3 changes: 2 additions & 1 deletion pdp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/json"
"errors"
"fmt"
types2 "github.com/filecoin-project/lotus/chain/types"
"io"
"math/big"
"net/http"
Expand All @@ -30,6 +29,8 @@ import (
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/pdp/contract"
"github.com/filecoin-project/curio/tasks/message"

types2 "github.com/filecoin-project/lotus/chain/types"
)

// PDPRoutePath is the base path for PDP routes
Expand Down
Loading

0 comments on commit fe06e34

Please sign in to comment.