Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rhp4): Add RHP4 listener #523

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type (
SetReadOnly(id int64, readOnly bool) error
RemoveSector(root types.Hash256) error
ResizeCache(size uint32)
Read(types.Hash256) (*[rhp2.SectorSize]byte, error)
ReadSector(types.Hash256) (*[rhp2.SectorSize]byte, error)

// SectorReferences returns the references to a sector
SectorReferences(root types.Hash256) (storage.SectorReference, error)
Expand Down
2 changes: 1 addition & 1 deletion api/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (a *api) handleGETVerifySector(jc jape.Context) {
}

// try to read the sector data and verify the root
data, err := a.volumes.Read(root)
data, err := a.volumes.ReadSector(root)
if err != nil {
resp.Error = err.Error()
} else if calc := rhp2.SectorRoot(data); calc != root {
Expand Down
8 changes: 8 additions & 0 deletions cmd/hostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ var (
RHP3: config.RHP3{
TCPAddress: ":9983",
},
RHP4: config.RHP4{
ListenAddresses: []config.RHP4ListenAddress{
{
Protocol: "tcp",
Address: ":9984",
},
},
},
Log: config.Log{
Path: os.Getenv(logFileEnvVar), // deprecated. included for compatibility.
Level: "info",
Expand Down
25 changes: 25 additions & 0 deletions cmd/hostd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/coreutils"
"go.sia.tech/coreutils/chain"
rhp4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/hostd/alerts"
Expand Down Expand Up @@ -333,6 +334,30 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
go rhp3.Serve()
defer rhp3.Close()

rhp4 := rhp4.NewServer(hostKey, cm, s, contractManager, wm, sm, vm, rhp4.WithPriceTableValidity(30*time.Minute), rhp4.WithContractProofWindowBuffer(72))

var stopListenerFuncs []func() error
defer func() {
for _, f := range stopListenerFuncs {
if err := f(); err != nil {
log.Error("failed to stop listener", zap.Error(err))
}
}
}()
for _, addr := range cfg.RHP4.ListenAddresses {
switch addr.Protocol {
case "tcp", "tcp4", "tcp6":
l, err := rhp.Listen(addr.Protocol, addr.Address, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to listen on rhp4 addr: %w", err)
}
stopListenerFuncs = append(stopListenerFuncs, l.Close)
go rhp.ServeRHP4SiaMux(l, rhp4, log.Named("rhp4"))
n8maninger marked this conversation as resolved.
Show resolved Hide resolved
default:
return fmt.Errorf("unsupported protocol: %s", addr.Protocol)
}
}

apiOpts := []api.ServerOption{
api.WithAlerts(am),
api.WithLogger(log.Named("api")),
Expand Down
22 changes: 17 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@ type (
IndexBatchSize int `yaml:"indexBatchSize,omitempty"`
}

// RHP2 contains the configuration for the RHP2 server.
RHP2 struct {
Address string `yaml:"address,omitempty"`
}

// ExplorerData contains the configuration for using an external explorer.
ExplorerData struct {
Disable bool `yaml:"disable,omitempty"`
URL string `yaml:"url,omitempty"`
}

// RHP2 contains the configuration for the RHP2 server.
RHP2 struct {
Address string `yaml:"address,omitempty"`
}

// RHP3 contains the configuration for the RHP3 server.
RHP3 struct {
TCPAddress string `yaml:"tcp,omitempty"`
}

// RHP4ListenAddress contains the configuration for an RHP4 listen address.
RHP4ListenAddress struct {
Protocol string `yaml:"protocol,omitempty"`
Address string `yaml:"address,omitempty"`
}

// RHP4 contains the configuration for the RHP4 server.
RHP4 struct {
ListenAddresses []RHP4ListenAddress `yaml:"listenAddresses,omitempty"`
}

// LogFile configures the file output of the logger.
LogFile struct {
Enabled bool `yaml:"enabled,omitempty"`
Expand Down Expand Up @@ -77,6 +88,7 @@ type (
Explorer ExplorerData `yaml:"explorer,omitempty"`
RHP2 RHP2 `yaml:"rhp2,omitempty"`
RHP3 RHP3 `yaml:"rhp3,omitempty"`
RHP4 RHP4 `yaml:"rhp4,omitempty"`
Log Log `yaml:"log,omitempty"`
}
)
4 changes: 2 additions & 2 deletions host/contracts/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ func (cm *Manager) AccountBalance(account proto4.Account) (types.Currency, error

// CreditAccountsWithContract atomically revises a contract and credits the accounts
// returning the new balance of each account.
func (cm *Manager) CreditAccountsWithContract(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract) ([]types.Currency, error) {
return cm.store.RHP4CreditAccounts(deposits, contractID, revision)
func (cm *Manager) CreditAccountsWithContract(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract, usage proto4.Usage) ([]types.Currency, error) {
return cm.store.RHP4CreditAccounts(deposits, contractID, revision, usage)
}

// DebitAccount debits an account.
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (cm *Manager) CheckIntegrity(ctx context.Context, contractID types.FileCont
default:
}
// read each sector from disk and verify its Merkle root
sector, err := cm.storage.Read(root)
sector, err := cm.storage.ReadSector(root)
if err != nil { // sector read failed
log.Error("missing sector", zap.String("root", root.String()), zap.Error(err))
missing++
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type (
// A StorageManager stores and retrieves sectors.
StorageManager interface {
// Read reads a sector from the store
Read(root types.Hash256) (*[rhp2.SectorSize]byte, error)
ReadSector(root types.Hash256) (*[rhp2.SectorSize]byte, error)
}

// Alerts registers and dismisses global alerts.
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type (
// RHP4AccountBalance returns the balance of an account.
RHP4AccountBalance(proto4.Account) (types.Currency, error)
// RHP4CreditAccounts atomically revises a contract and credits the accounts
RHP4CreditAccounts([]proto4.AccountDeposit, types.FileContractID, types.V2FileContract) (balances []types.Currency, err error)
RHP4CreditAccounts([]proto4.AccountDeposit, types.FileContractID, types.V2FileContract, proto4.Usage) (balances []types.Currency, err error)
// RHP4DebitAccount debits an account.
RHP4DebitAccount(proto4.Account, proto4.Usage) error
}
Expand Down
4 changes: 2 additions & 2 deletions host/contracts/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (cm *Manager) buildStorageProof(revision types.FileContractRevision, index
}

sectorRoot := roots[sectorIndex]
sector, err := cm.storage.Read(sectorRoot)
sector, err := cm.storage.ReadSector(sectorRoot)
if err != nil {
log.Error("failed to read sector data", zap.Error(err), zap.Stringer("sectorRoot", sectorRoot))
return types.StorageProof{}, fmt.Errorf("failed to read sector data")
Expand Down Expand Up @@ -158,7 +158,7 @@ func (cm *Manager) buildV2StorageProof(cs consensus.State, fce types.V2FileContr
}

sectorRoot := roots[sectorIndex]
sector, err := cm.storage.Read(sectorRoot)
sector, err := cm.storage.ReadSector(sectorRoot)
if err != nil {
log.Error("failed to read sector data", zap.Error(err), zap.Stringer("sectorRoot", sectorRoot))
return types.V2StorageProof{}, fmt.Errorf("failed to read sector data")
Expand Down
4 changes: 2 additions & 2 deletions host/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,8 @@ func (vm *VolumeManager) readLocation(loc SectorLocation) (*[proto2.SectorSize]b
return sector, nil
}

// Read reads the sector with the given root
func (vm *VolumeManager) Read(root types.Hash256) (*[proto2.SectorSize]byte, error) {
// ReadSector reads the sector with the given root from disk
func (vm *VolumeManager) ReadSector(root types.Hash256) (*[proto2.SectorSize]byte, error) {
done, err := vm.tg.Add()
if err != nil {
return nil, err
Expand Down
18 changes: 9 additions & 9 deletions host/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestVolumeLoad(t *testing.T) {
}

// check that the sector is still there
sector2, err := vm.Read(root)
sector2, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
} else if *sector2 != sector {
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestRemoveVolume(t *testing.T) {

checkRoots := func(roots []types.Hash256) error {
for _, root := range roots {
sector, err := vm.Read(root)
sector, err := vm.ReadSector(root)
if err != nil {
return fmt.Errorf("failed to read sector: %w", err)
} else if rhp2.SectorRoot(sector) != root {
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestVolumeConcurrency(t *testing.T) {

// read the sectors back
for _, root := range roots {
sector, err := vm.Read(root)
sector, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
} else if rhp2.SectorRoot(sector) != root {
Expand All @@ -646,7 +646,7 @@ func TestVolumeConcurrency(t *testing.T) {

// read the sectors back
for _, root := range roots {
sector, err := vm.Read(root)
sector, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
} else if rhp2.SectorRoot(sector) != root {
Expand Down Expand Up @@ -1003,7 +1003,7 @@ func TestVolumeManagerReadWrite(t *testing.T) {
// read the sectors back
frand.Shuffle(len(roots), func(i, j int) { roots[i], roots[j] = roots[j], roots[i] })
for _, root := range roots {
sector, err := vm.Read(root)
sector, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func TestSectorCache(t *testing.T) {

// read the last 5 sectors all sectors should be cached
for i, root := range roots[5:] {
_, err := vm.Read(root)
_, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1108,7 +1108,7 @@ func TestSectorCache(t *testing.T) {

// read the first 5 sectors all sectors should be missed
for i, root := range roots[:5] {
_, err := vm.Read(root)
_, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1123,7 +1123,7 @@ func TestSectorCache(t *testing.T) {

// read the first 5 sectors again all sectors should be cached
for i, root := range roots[:5] {
_, err := vm.Read(root)
_, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func BenchmarkVolumeManagerRead(b *testing.B) {
b.SetBytes(rhp2.SectorSize)
// read the sectors back
for _, root := range written {
if _, err := vm.Read(root); err != nil {
if _, err := vm.ReadSector(root); err != nil {
b.Fatal(err)
}
}
Expand Down
4 changes: 1 addition & 3 deletions persist/sqlite/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *Store) RHP4DebitAccount(account proto4.Account, usage proto4.Usage) err

// RHP4CreditAccounts credits the accounts with the given deposits and revises
// the contract.
func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract) (balances []types.Currency, err error) {
func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract, usage proto4.Usage) (balances []types.Currency, err error) {
err = s.transaction(func(tx *txn) error {
getBalanceStmt, err := tx.Prepare(`SELECT balance FROM accounts WHERE account_id=$1`)
if err != nil {
Expand Down Expand Up @@ -92,7 +92,6 @@ func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID
return fmt.Errorf("failed to get contract ID: %w", err)
}

var usage proto4.Usage
var createdAccounts int
for _, deposit := range deposits {
var balance types.Currency
Expand Down Expand Up @@ -120,7 +119,6 @@ func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID
if _, err := updateFundingAmountStmt.Exec(contractDBID, accountDBID, encode(fundAmount)); err != nil {
return fmt.Errorf("failed to update funding amount: %w", err)
}
usage.AccountFunding = usage.AccountFunding.Add(deposit.Amount)
}

_, err = reviseV2Contract(tx, contractID, revision, usage)
Expand Down
6 changes: 3 additions & 3 deletions persist/sqlite/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestRHP4Accounts(t *testing.T) {
// deposit funds
balances, err := db.RHP4CreditAccounts([]proto4.AccountDeposit{
{Account: account, Amount: types.Siacoins(10)},
}, contract.ID, contract.V2FileContract)
}, contract.ID, contract.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(10)})
if err != nil {
t.Fatal(err)
} else if len(balances) != 1 {
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestRHP4AccountsDistribution(t *testing.T) {

balances, err := db.RHP4CreditAccounts([]proto4.AccountDeposit{
{Account: account, Amount: types.Siacoins(3)},
}, c1.ID, c1.V2FileContract)
}, c1.ID, c1.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(3)})
if err != nil {
t.Fatal(err)
} else if len(balances) != 1 {
Expand All @@ -318,7 +318,7 @@ func TestRHP4AccountsDistribution(t *testing.T) {

balances, err = db.RHP4CreditAccounts([]proto4.AccountDeposit{
{Account: account, Amount: types.Siacoins(3)},
}, c2.ID, c2.V2FileContract)
}, c2.ID, c2.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(3)})
if err != nil {
t.Fatal(err)
} else if len(balances) != 1 {
Expand Down
47 changes: 47 additions & 0 deletions rhp/siamux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rhp

import (
"crypto/ed25519"
"net"

rhp4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/mux/v2"
"go.uber.org/zap"
)

// A muxTransport is a rhp4.Transport that wraps a mux.Mux.
type muxTransport struct {
m *mux.Mux
}

// Close implements the rhp4.Transport interface.
func (mt *muxTransport) Close() error {
return mt.m.Close()
}

// AcceptStream implements the rhp4.Transport interface.
func (mt *muxTransport) AcceptStream() (net.Conn, error) {
return mt.m.AcceptStream()
}

// ServeRHP4SiaMux serves RHP4 connections on l using the provided server and logger.
func ServeRHP4SiaMux(l net.Listener, s *rhp4.Server, log *zap.Logger) {
for {
conn, err := l.Accept()
if err != nil {
log.Error("failed to accept connection", zap.Error(err))
return
}
log := log.With(zap.String("peerAddress", conn.RemoteAddr().String()))
go func() {
defer conn.Close()

m, err := mux.Accept(conn, ed25519.PrivateKey(s.HostKey()))
if err != nil {
log.Debug("failed to accept mux connection", zap.Error(err))
} else if err := s.Serve(&muxTransport{m}, log); err != nil {
log.Debug("failed to serve connection", zap.Error(err))
}
}()
}
}
4 changes: 2 additions & 2 deletions rhp/v2/rhp.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type (
Sectors interface {
// Write writes a sector to persistent storage
Write(root types.Hash256, data *[rhp2.SectorSize]byte) error
// Read reads the sector with the given root from the manager.
Read(root types.Hash256) (*[rhp2.SectorSize]byte, error)
// ReadSector reads the sector with the given root from the manager.
ReadSector(root types.Hash256) (*[rhp2.SectorSize]byte, error)
// Sync syncs the data files of changed volumes.
Sync() error
}
Expand Down
4 changes: 2 additions & 2 deletions rhp/v2/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) (contracts.Usage
return contracts.Usage{}, err
}

sector, err := sh.sectors.Read(root)
sector, err := sh.sectors.ReadSector(root)
if err != nil {
s.t.WriteResponseErr(ErrHostInternalError)
return contracts.Usage{}, fmt.Errorf("failed to read sector %v: %w", root, err)
Expand Down Expand Up @@ -849,7 +849,7 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) (contracts.Usage,

// enter response loop
for i, sec := range req.Sections {
sector, err := sh.sectors.Read(sec.MerkleRoot)
sector, err := sh.sectors.ReadSector(sec.MerkleRoot)
if err != nil {
err := fmt.Errorf("failed to get sector: %w", err)
s.t.WriteResponseErr(err)
Expand Down
Loading
Loading