From 820302a3f0b9e82ff65e7ab384937fd9cc797fb6 Mon Sep 17 00:00:00 2001 From: Nate Date: Tue, 10 Dec 2024 15:31:19 -0800 Subject: [PATCH] feat(rhp4): Add RHP4 listener --- api/api.go | 2 +- api/volumes.go | 2 +- cmd/hostd/main.go | 8 ++++++ cmd/hostd/run.go | 25 ++++++++++++++++++ config/config.go | 22 +++++++++++---- host/contracts/accounts.go | 4 +-- host/contracts/integrity.go | 2 +- host/contracts/manager.go | 2 +- host/contracts/persist.go | 2 +- host/contracts/update.go | 4 +-- host/storage/storage.go | 4 +-- host/storage/storage_test.go | 18 ++++++------- persist/sqlite/accounts.go | 4 +-- persist/sqlite/accounts_test.go | 6 ++--- rhp/siamux.go | 47 +++++++++++++++++++++++++++++++++ rhp/v2/rhp.go | 4 +-- rhp/v2/rpc.go | 4 +-- rhp/v3/execute.go | 6 ++--- rhp/v3/rhp.go | 4 +-- 19 files changed, 130 insertions(+), 40 deletions(-) create mode 100644 rhp/siamux.go diff --git a/api/api.go b/api/api.go index 4e7573f8..41de4e7b 100644 --- a/api/api.go +++ b/api/api.go @@ -85,7 +85,7 @@ type ( SetReadOnly(id int64, readOnly bool) error RemoveSector(root types.Hash256) error ResizeCache(size uint32) - Read(types.Hash256) (*[rhp2.SectorSize]byte, error) + ReadSector(types.Hash256) (*[rhp2.SectorSize]byte, error) // SectorReferences returns the references to a sector SectorReferences(root types.Hash256) (storage.SectorReference, error) diff --git a/api/volumes.go b/api/volumes.go index c8552a40..bed71e7d 100644 --- a/api/volumes.go +++ b/api/volumes.go @@ -224,7 +224,7 @@ func (a *api) handleGETVerifySector(jc jape.Context) { } // try to read the sector data and verify the root - data, err := a.volumes.Read(root) + data, err := a.volumes.ReadSector(root) if err != nil { resp.Error = err.Error() } else if calc := rhp2.SectorRoot(data); calc != root { diff --git a/cmd/hostd/main.go b/cmd/hostd/main.go index 50ba60fe..0be39da7 100644 --- a/cmd/hostd/main.go +++ b/cmd/hostd/main.go @@ -57,6 +57,14 @@ var ( RHP3: config.RHP3{ TCPAddress: ":9983", }, + RHP4: config.RHP4{ + ListenAddresses: []config.RHP4ListenAddress{ + { + Protocol: "tcp", + Address: ":9984", + }, + }, + }, Log: config.Log{ Path: os.Getenv(logFileEnvVar), // deprecated. included for compatibility. Level: "info", diff --git a/cmd/hostd/run.go b/cmd/hostd/run.go index 074ae320..578b31f3 100644 --- a/cmd/hostd/run.go +++ b/cmd/hostd/run.go @@ -16,6 +16,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/coreutils" "go.sia.tech/coreutils/chain" + rhp4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/coreutils/syncer" "go.sia.tech/coreutils/wallet" "go.sia.tech/hostd/alerts" @@ -333,6 +334,30 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK go rhp3.Serve() defer rhp3.Close() + rhp4 := rhp4.NewServer(hostKey, cm, s, contractManager, wm, sm, vm, rhp4.WithPriceTableValidity(30*time.Minute), rhp4.WithContractProofWindowBuffer(72)) + + var stopListenerFuncs []func() error + defer func() { + for _, f := range stopListenerFuncs { + if err := f(); err != nil { + log.Error("failed to stop listener", zap.Error(err)) + } + } + }() + for _, addr := range cfg.RHP4.ListenAddresses { + switch addr.Protocol { + case "tcp", "tcp4", "tcp6": + l, err := rhp.Listen(addr.Protocol, addr.Address, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl)) + if err != nil { + return fmt.Errorf("failed to listen on rhp4 addr: %w", err) + } + stopListenerFuncs = append(stopListenerFuncs, l.Close) + go rhp.ServeRHP4SiaMux(l, rhp4, log.Named("rhp4")) + default: + return fmt.Errorf("unsupported protocol: %s", addr.Protocol) + } + } + apiOpts := []api.ServerOption{ api.WithAlerts(am), api.WithLogger(log.Named("api")), diff --git a/config/config.go b/config/config.go index 38576cc8..f899eee5 100644 --- a/config/config.go +++ b/config/config.go @@ -21,22 +21,33 @@ type ( IndexBatchSize int `yaml:"indexBatchSize,omitempty"` } - // RHP2 contains the configuration for the RHP2 server. - RHP2 struct { - Address string `yaml:"address,omitempty"` - } - // ExplorerData contains the configuration for using an external explorer. ExplorerData struct { Disable bool `yaml:"disable,omitempty"` URL string `yaml:"url,omitempty"` } + // RHP2 contains the configuration for the RHP2 server. + RHP2 struct { + Address string `yaml:"address,omitempty"` + } + // RHP3 contains the configuration for the RHP3 server. RHP3 struct { TCPAddress string `yaml:"tcp,omitempty"` } + // RHP4ListenAddress contains the configuration for an RHP4 listen address. + RHP4ListenAddress struct { + Protocol string `yaml:"protocol,omitempty"` + Address string `yaml:"address,omitempty"` + } + + // RHP4 contains the configuration for the RHP4 server. + RHP4 struct { + ListenAddresses []RHP4ListenAddress `yaml:"listenAddresses,omitempty"` + } + // LogFile configures the file output of the logger. LogFile struct { Enabled bool `yaml:"enabled,omitempty"` @@ -77,6 +88,7 @@ type ( Explorer ExplorerData `yaml:"explorer,omitempty"` RHP2 RHP2 `yaml:"rhp2,omitempty"` RHP3 RHP3 `yaml:"rhp3,omitempty"` + RHP4 RHP4 `yaml:"rhp4,omitempty"` Log Log `yaml:"log,omitempty"` } ) diff --git a/host/contracts/accounts.go b/host/contracts/accounts.go index d5999e04..8d528263 100644 --- a/host/contracts/accounts.go +++ b/host/contracts/accounts.go @@ -12,8 +12,8 @@ func (cm *Manager) AccountBalance(account proto4.Account) (types.Currency, error // CreditAccountsWithContract atomically revises a contract and credits the accounts // returning the new balance of each account. -func (cm *Manager) CreditAccountsWithContract(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract) ([]types.Currency, error) { - return cm.store.RHP4CreditAccounts(deposits, contractID, revision) +func (cm *Manager) CreditAccountsWithContract(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract, usage proto4.Usage) ([]types.Currency, error) { + return cm.store.RHP4CreditAccounts(deposits, contractID, revision, usage) } // DebitAccount debits an account. diff --git a/host/contracts/integrity.go b/host/contracts/integrity.go index aee4e633..76f7d0d5 100644 --- a/host/contracts/integrity.go +++ b/host/contracts/integrity.go @@ -116,7 +116,7 @@ func (cm *Manager) CheckIntegrity(ctx context.Context, contractID types.FileCont default: } // read each sector from disk and verify its Merkle root - sector, err := cm.storage.Read(root) + sector, err := cm.storage.ReadSector(root) if err != nil { // sector read failed log.Error("missing sector", zap.String("root", root.String()), zap.Error(err)) missing++ diff --git a/host/contracts/manager.go b/host/contracts/manager.go index 97e492c7..ace8a486 100644 --- a/host/contracts/manager.go +++ b/host/contracts/manager.go @@ -51,7 +51,7 @@ type ( // A StorageManager stores and retrieves sectors. StorageManager interface { // Read reads a sector from the store - Read(root types.Hash256) (*[rhp2.SectorSize]byte, error) + ReadSector(root types.Hash256) (*[rhp2.SectorSize]byte, error) } // Alerts registers and dismisses global alerts. diff --git a/host/contracts/persist.go b/host/contracts/persist.go index 03636247..ae1a458f 100644 --- a/host/contracts/persist.go +++ b/host/contracts/persist.go @@ -59,7 +59,7 @@ type ( // RHP4AccountBalance returns the balance of an account. RHP4AccountBalance(proto4.Account) (types.Currency, error) // RHP4CreditAccounts atomically revises a contract and credits the accounts - RHP4CreditAccounts([]proto4.AccountDeposit, types.FileContractID, types.V2FileContract) (balances []types.Currency, err error) + RHP4CreditAccounts([]proto4.AccountDeposit, types.FileContractID, types.V2FileContract, proto4.Usage) (balances []types.Currency, err error) // RHP4DebitAccount debits an account. RHP4DebitAccount(proto4.Account, proto4.Usage) error } diff --git a/host/contracts/update.go b/host/contracts/update.go index a094520b..e1e72c7d 100644 --- a/host/contracts/update.go +++ b/host/contracts/update.go @@ -115,7 +115,7 @@ func (cm *Manager) buildStorageProof(revision types.FileContractRevision, index } sectorRoot := roots[sectorIndex] - sector, err := cm.storage.Read(sectorRoot) + sector, err := cm.storage.ReadSector(sectorRoot) if err != nil { log.Error("failed to read sector data", zap.Error(err), zap.Stringer("sectorRoot", sectorRoot)) return types.StorageProof{}, fmt.Errorf("failed to read sector data") @@ -158,7 +158,7 @@ func (cm *Manager) buildV2StorageProof(cs consensus.State, fce types.V2FileContr } sectorRoot := roots[sectorIndex] - sector, err := cm.storage.Read(sectorRoot) + sector, err := cm.storage.ReadSector(sectorRoot) if err != nil { log.Error("failed to read sector data", zap.Error(err), zap.Stringer("sectorRoot", sectorRoot)) return types.V2StorageProof{}, fmt.Errorf("failed to read sector data") diff --git a/host/storage/storage.go b/host/storage/storage.go index 6457e93b..076994af 100644 --- a/host/storage/storage.go +++ b/host/storage/storage.go @@ -849,8 +849,8 @@ func (vm *VolumeManager) readLocation(loc SectorLocation) (*[proto2.SectorSize]b return sector, nil } -// Read reads the sector with the given root -func (vm *VolumeManager) Read(root types.Hash256) (*[proto2.SectorSize]byte, error) { +// ReadSector reads the sector with the given root from disk +func (vm *VolumeManager) ReadSector(root types.Hash256) (*[proto2.SectorSize]byte, error) { done, err := vm.tg.Add() if err != nil { return nil, err diff --git a/host/storage/storage_test.go b/host/storage/storage_test.go index 11e4e1da..10196311 100644 --- a/host/storage/storage_test.go +++ b/host/storage/storage_test.go @@ -90,7 +90,7 @@ func TestVolumeLoad(t *testing.T) { } // check that the sector is still there - sector2, err := vm.Read(root) + sector2, err := vm.ReadSector(root) if err != nil { t.Fatal(err) } else if *sector2 != sector { @@ -197,7 +197,7 @@ func TestRemoveVolume(t *testing.T) { checkRoots := func(roots []types.Hash256) error { for _, root := range roots { - sector, err := vm.Read(root) + sector, err := vm.ReadSector(root) if err != nil { return fmt.Errorf("failed to read sector: %w", err) } else if rhp2.SectorRoot(sector) != root { @@ -631,7 +631,7 @@ func TestVolumeConcurrency(t *testing.T) { // read the sectors back for _, root := range roots { - sector, err := vm.Read(root) + sector, err := vm.ReadSector(root) if err != nil { t.Fatal(err) } else if rhp2.SectorRoot(sector) != root { @@ -646,7 +646,7 @@ func TestVolumeConcurrency(t *testing.T) { // read the sectors back for _, root := range roots { - sector, err := vm.Read(root) + sector, err := vm.ReadSector(root) if err != nil { t.Fatal(err) } else if rhp2.SectorRoot(sector) != root { @@ -1003,7 +1003,7 @@ func TestVolumeManagerReadWrite(t *testing.T) { // read the sectors back frand.Shuffle(len(roots), func(i, j int) { roots[i], roots[j] = roots[j], roots[i] }) for _, root := range roots { - sector, err := vm.Read(root) + sector, err := vm.ReadSector(root) if err != nil { t.Fatal(err) } @@ -1093,7 +1093,7 @@ func TestSectorCache(t *testing.T) { // read the last 5 sectors all sectors should be cached for i, root := range roots[5:] { - _, err := vm.Read(root) + _, err := vm.ReadSector(root) if err != nil { t.Fatal(err) } @@ -1108,7 +1108,7 @@ func TestSectorCache(t *testing.T) { // read the first 5 sectors all sectors should be missed for i, root := range roots[:5] { - _, err := vm.Read(root) + _, err := vm.ReadSector(root) if err != nil { t.Fatal(err) } @@ -1123,7 +1123,7 @@ func TestSectorCache(t *testing.T) { // read the first 5 sectors again all sectors should be cached for i, root := range roots[:5] { - _, err := vm.Read(root) + _, err := vm.ReadSector(root) if err != nil { t.Fatal(err) } @@ -1351,7 +1351,7 @@ func BenchmarkVolumeManagerRead(b *testing.B) { b.SetBytes(rhp2.SectorSize) // read the sectors back for _, root := range written { - if _, err := vm.Read(root); err != nil { + if _, err := vm.ReadSector(root); err != nil { b.Fatal(err) } } diff --git a/persist/sqlite/accounts.go b/persist/sqlite/accounts.go index 63c24bbe..d1738d43 100644 --- a/persist/sqlite/accounts.go +++ b/persist/sqlite/accounts.go @@ -60,7 +60,7 @@ func (s *Store) RHP4DebitAccount(account proto4.Account, usage proto4.Usage) err // RHP4CreditAccounts credits the accounts with the given deposits and revises // the contract. -func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract) (balances []types.Currency, err error) { +func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract, usage proto4.Usage) (balances []types.Currency, err error) { err = s.transaction(func(tx *txn) error { getBalanceStmt, err := tx.Prepare(`SELECT balance FROM accounts WHERE account_id=$1`) if err != nil { @@ -92,7 +92,6 @@ func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID return fmt.Errorf("failed to get contract ID: %w", err) } - var usage proto4.Usage var createdAccounts int for _, deposit := range deposits { var balance types.Currency @@ -120,7 +119,6 @@ func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID if _, err := updateFundingAmountStmt.Exec(contractDBID, accountDBID, encode(fundAmount)); err != nil { return fmt.Errorf("failed to update funding amount: %w", err) } - usage.AccountFunding = usage.AccountFunding.Add(deposit.Amount) } _, err = reviseV2Contract(tx, contractID, revision, usage) diff --git a/persist/sqlite/accounts_test.go b/persist/sqlite/accounts_test.go index d87aec4a..1c70a1c0 100644 --- a/persist/sqlite/accounts_test.go +++ b/persist/sqlite/accounts_test.go @@ -87,7 +87,7 @@ func TestRHP4Accounts(t *testing.T) { // deposit funds balances, err := db.RHP4CreditAccounts([]proto4.AccountDeposit{ {Account: account, Amount: types.Siacoins(10)}, - }, contract.ID, contract.V2FileContract) + }, contract.ID, contract.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(10)}) if err != nil { t.Fatal(err) } else if len(balances) != 1 { @@ -303,7 +303,7 @@ func TestRHP4AccountsDistribution(t *testing.T) { balances, err := db.RHP4CreditAccounts([]proto4.AccountDeposit{ {Account: account, Amount: types.Siacoins(3)}, - }, c1.ID, c1.V2FileContract) + }, c1.ID, c1.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(3)}) if err != nil { t.Fatal(err) } else if len(balances) != 1 { @@ -318,7 +318,7 @@ func TestRHP4AccountsDistribution(t *testing.T) { balances, err = db.RHP4CreditAccounts([]proto4.AccountDeposit{ {Account: account, Amount: types.Siacoins(3)}, - }, c2.ID, c2.V2FileContract) + }, c2.ID, c2.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(3)}) if err != nil { t.Fatal(err) } else if len(balances) != 1 { diff --git a/rhp/siamux.go b/rhp/siamux.go new file mode 100644 index 00000000..0ef15314 --- /dev/null +++ b/rhp/siamux.go @@ -0,0 +1,47 @@ +package rhp + +import ( + "crypto/ed25519" + "net" + + rhp4 "go.sia.tech/coreutils/rhp/v4" + "go.sia.tech/mux/v2" + "go.uber.org/zap" +) + +// A muxTransport is a rhp4.Transport that wraps a mux.Mux. +type muxTransport struct { + m *mux.Mux +} + +// Close implements the rhp4.Transport interface. +func (mt *muxTransport) Close() error { + return mt.m.Close() +} + +// AcceptStream implements the rhp4.Transport interface. +func (mt *muxTransport) AcceptStream() (net.Conn, error) { + return mt.m.AcceptStream() +} + +// ServeRHP4SiaMux serves RHP4 connections on l using the provided server and logger. +func ServeRHP4SiaMux(l net.Listener, s *rhp4.Server, log *zap.Logger) { + for { + conn, err := l.Accept() + if err != nil { + log.Error("failed to accept connection", zap.Error(err)) + return + } + log := log.With(zap.String("peerAddress", conn.RemoteAddr().String())) + go func() { + defer conn.Close() + + m, err := mux.Accept(conn, ed25519.PrivateKey(s.HostKey())) + if err != nil { + log.Debug("failed to accept mux connection", zap.Error(err)) + } else if err := s.Serve(&muxTransport{m}, log); err != nil { + log.Debug("failed to serve connection", zap.Error(err)) + } + }() + } +} diff --git a/rhp/v2/rhp.go b/rhp/v2/rhp.go index 813f44b9..3b795508 100644 --- a/rhp/v2/rhp.go +++ b/rhp/v2/rhp.go @@ -52,8 +52,8 @@ type ( Sectors interface { // Write writes a sector to persistent storage Write(root types.Hash256, data *[rhp2.SectorSize]byte) error - // Read reads the sector with the given root from the manager. - Read(root types.Hash256) (*[rhp2.SectorSize]byte, error) + // ReadSector reads the sector with the given root from the manager. + ReadSector(root types.Hash256) (*[rhp2.SectorSize]byte, error) // Sync syncs the data files of changed volumes. Sync() error } diff --git a/rhp/v2/rpc.go b/rhp/v2/rpc.go index e18363ee..7d9bff26 100644 --- a/rhp/v2/rpc.go +++ b/rhp/v2/rpc.go @@ -652,7 +652,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) (contracts.Usage return contracts.Usage{}, err } - sector, err := sh.sectors.Read(root) + sector, err := sh.sectors.ReadSector(root) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) return contracts.Usage{}, fmt.Errorf("failed to read sector %v: %w", root, err) @@ -849,7 +849,7 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) (contracts.Usage, // enter response loop for i, sec := range req.Sections { - sector, err := sh.sectors.Read(sec.MerkleRoot) + sector, err := sh.sectors.ReadSector(sec.MerkleRoot) if err != nil { err := fmt.Errorf("failed to get sector: %w", err) s.t.WriteResponseErr(err) diff --git a/rhp/v3/execute.go b/rhp/v3/execute.go index 1d139110..b72c7323 100644 --- a/rhp/v3/execute.go +++ b/rhp/v3/execute.go @@ -221,7 +221,7 @@ func (pe *programExecutor) executeReadOffset(instr *rhp3.InstrReadOffset, log *z return nil, nil, fmt.Errorf("failed to get root: %w", err) } - sector, err := pe.sectors.Read(root) + sector, err := pe.sectors.ReadSector(root) if err != nil { return nil, nil, fmt.Errorf("failed to read sector: %w", err) } @@ -270,7 +270,7 @@ func (pe *programExecutor) executeReadSector(instr *rhp3.InstrReadSector, log *z } // read the sector - sector, err := pe.sectors.Read(root) + sector, err := pe.sectors.ReadSector(root) if errors.Is(err, storage.ErrSectorNotFound) { log.Debug("failed to read sector", zap.String("root", root.String()), zap.Error(err)) return nil, nil, storage.ErrSectorNotFound @@ -359,7 +359,7 @@ func (pe *programExecutor) executeUpdateSector(instr *rhp3.InstrUpdateSector, _ return nil, nil, fmt.Errorf("failed to get root: %w", err) } - sector, err := pe.sectors.Read(oldRoot) + sector, err := pe.sectors.ReadSector(oldRoot) if err != nil { return nil, nil, fmt.Errorf("failed to read sector: %w", err) } diff --git a/rhp/v3/rhp.go b/rhp/v3/rhp.go index 3b6ff796..7270d678 100644 --- a/rhp/v3/rhp.go +++ b/rhp/v3/rhp.go @@ -55,8 +55,8 @@ type ( HasSector(root types.Hash256) (bool, error) // Write writes a sector to persistent storage. Write(root types.Hash256, data *[rhp2.SectorSize]byte) error - // Read reads the sector with the given root from the manager. - Read(root types.Hash256) (*[rhp2.SectorSize]byte, error) + // ReadSector reads the sector with the given root from the manager. + ReadSector(root types.Hash256) (*[rhp2.SectorSize]byte, error) // Sync syncs the data files of changed volumes. Sync() error