Skip to content

Commit

Permalink
cmd,settings,rhp: remove RHP session reporter and refactor rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Nov 18, 2024
1 parent 0b367ae commit 55b9357
Show file tree
Hide file tree
Showing 20 changed files with 182 additions and 645 deletions.
14 changes: 0 additions & 14 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/settings/pin"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/rhp"
"go.sia.tech/hostd/webhooks"
"go.sia.tech/jape"
"go.uber.org/zap"
Expand Down Expand Up @@ -152,14 +151,6 @@ type (
BroadcastToWebhook(id int64, event, scope string, data interface{}) error
}

// A RHPSessionReporter reports on RHP session lifecycle events
RHPSessionReporter interface {
Subscribe(rhp.SessionSubscriber)
Unsubscribe(rhp.SessionSubscriber)

Active() []rhp.Session
}

// An api provides an HTTP API for the host
api struct {
hostKey types.PublicKey
Expand All @@ -168,7 +159,6 @@ type (
log *zap.Logger
alerts Alerts
webhooks Webhooks
sessions RHPSessionReporter

sqlite3Store SQLite3Store

Expand Down Expand Up @@ -216,7 +206,6 @@ func NewServer(name string, hostKey types.PublicKey, cm ChainManager, s Syncer,
hostKey: hostKey,
name: name,

sessions: noopSessionReporter{},
alerts: noopAlerts{},
webhooks: noopWebhooks{},
log: zap.NewNop(),
Expand Down Expand Up @@ -291,9 +280,6 @@ func NewServer(name string, hostKey types.PublicKey, cm ChainManager, s Syncer,
"DELETE /volumes/:id": a.handleDeleteVolume,
"DELETE /volumes/:id/cancel": a.handleDELETEVolumeCancelOp,
"PUT /volumes/:id/resize": a.handlePUTVolumeResize,
// session endpoints
"GET /sessions": a.handleGETSessions,
"GET /sessions/subscribe": a.handleGETSessionsSubscribe,
// tpool endpoints
"GET /tpool/fee": a.handleGETTPoolFee,
// wallet endpoints
Expand Down
14 changes: 0 additions & 14 deletions api/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/hostd/alerts"
"go.sia.tech/hostd/explorer"
"go.sia.tech/hostd/rhp"
"go.sia.tech/hostd/webhooks"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -51,13 +50,6 @@ func WithExplorer(explorer *explorer.Explorer) ServerOption {
}
}

// WithRHPSessionReporter sets the RHP session reporter for the API server.
func WithRHPSessionReporter(rsr RHPSessionReporter) ServerOption {
return func(a *api) {
a.sessions = rsr
}
}

// WithLogger sets the logger for the API server.
func WithLogger(log *zap.Logger) ServerOption {
return func(a *api) {
Expand All @@ -81,9 +73,3 @@ type noopAlerts struct{}

func (noopAlerts) Active() []alerts.Alert { return nil }
func (noopAlerts) Dismiss(...types.Hash256) {}

type noopSessionReporter struct{}

func (noopSessionReporter) Subscribe(rhp.SessionSubscriber) {}
func (noopSessionReporter) Unsubscribe(rhp.SessionSubscriber) {}
func (noopSessionReporter) Active() []rhp.Session { return nil }
21 changes: 0 additions & 21 deletions api/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,24 +462,3 @@ func (w WalletPendingResp) PrometheusMetric() (metrics []prometheus.Metric) {
}
return
}

// PrometheusMetric returns Prometheus samples for the hosts sessions
func (s SessionResp) PrometheusMetric() (metrics []prometheus.Metric) {
for _, session := range s {
metrics = append(metrics, prometheus.Metric{
Name: "hostd_session_ingress",
Labels: map[string]any{
"peer": session.PeerAddress,
},
Value: float64(session.Ingress),
})
metrics = append(metrics, prometheus.Metric{
Name: "hostd_session_egress",
Labels: map[string]any{
"peer": session.PeerAddress,
},
Value: float64(session.Egress),
})
}
return
}
45 changes: 0 additions & 45 deletions api/rhpsessions.go

This file was deleted.

4 changes: 0 additions & 4 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.sia.tech/hostd/host/metrics"
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/rhp"
)

// JSON keys for host setting fields
Expand Down Expand Up @@ -193,9 +192,6 @@ type (

// WalletPendingResp is the response body for the [GET] /wallet/pending endpoint
WalletPendingResp []wallet.Event

// SessionResp is the response body for the [GET] /sessions endpoint
SessionResp []rhp.Session
)

// MarshalJSON implements json.Marshaler
Expand Down
43 changes: 21 additions & 22 deletions cmd/hostd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,6 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
}
defer syncerListener.Close()

rhp2Listener, err := net.Listen("tcp", cfg.RHP2.Address)
if err != nil {
return fmt.Errorf("failed to listen on rhp2 addr: %w", err)
}
defer rhp2Listener.Close()

rhp3Listener, err := net.Listen("tcp", cfg.RHP3.TCPAddress)
if err != nil {
return fmt.Errorf("failed to listen on rhp3 addr: %w", err)
}
defer rhp3Listener.Close()

syncerAddr := syncerListener.Addr().String()
if cfg.Syncer.EnableUPnP {
_, portStr, _ := net.SplitHostPort(cfg.Syncer.Address)
Expand Down Expand Up @@ -249,15 +237,14 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
return fmt.Errorf("failed to create webhook reporter: %w", err)
}
defer wr.Close()
sr := rhp.NewSessionReporter()

am := alerts.NewManager(alerts.WithEventReporter(wr), alerts.WithLog(log.Named("alerts")))

cfm, err := settings.NewConfigManager(hostKey, store, cm, s, wm, settings.WithAlertManager(am), settings.WithLog(log.Named("settings")))
sm, err := settings.NewConfigManager(hostKey, store, cm, s, wm, settings.WithAlertManager(am), settings.WithLog(log.Named("settings")))
if err != nil {
return fmt.Errorf("failed to create settings manager: %w", err)
}
defer cfm.Close()
defer sm.Close()

vm, err := storage.NewVolumeManager(store, storage.WithLogger(log.Named("volumes")), storage.WithAlerter(am))
if err != nil {
Expand All @@ -271,24 +258,37 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
}
defer contractManager.Close()

index, err := index.NewManager(store, cm, contractManager, wm, cfm, vm, index.WithLog(log.Named("index")), index.WithBatchSize(cfg.Consensus.IndexBatchSize))
index, err := index.NewManager(store, cm, contractManager, wm, sm, vm, index.WithLog(log.Named("index")), index.WithBatchSize(cfg.Consensus.IndexBatchSize))
if err != nil {
return fmt.Errorf("failed to create index manager: %w", err)
}
defer index.Close()

dr := rhp.NewDataRecorder(store, log.Named("data"))

rhp2, err := rhp2.NewSessionHandler(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, s, wm, contractManager, cfm, vm, rhp2.WithDataMonitor(dr), rhp2.WithLog(log.Named("rhp2")))
rl, wl := sm.RHPBandwidthLimiters()
rhp2Listener, err := rhp.Listen("tcp", cfg.RHP2.Address, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to listen on rhp2 addr: %w", err)
}
defer rhp2Listener.Close()

rhp3Listener, err := rhp.Listen("tcp", cfg.RHP3.TCPAddress, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to listen on rhp3 addr: %w", err)
}
defer rhp3Listener.Close()

rhp2, err := rhp2.NewSessionHandler(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, s, wm, contractManager, sm, vm, log.Named("rhp2"))
if err != nil {
return fmt.Errorf("failed to create rhp2 session handler: %w", err)
}
go rhp2.Serve()
defer rhp2.Close()

registry := registry.NewManager(hostKey, store, log.Named("registry"))
accounts := accounts.NewManager(store, cfm)
rhp3, err := rhp3.NewSessionHandler(rhp3Listener, hostKey, cm, s, wm, accounts, contractManager, registry, vm, cfm, rhp3.WithDataMonitor(dr), rhp3.WithSessionReporter(sr), rhp3.WithLog(log.Named("rhp3")))
accounts := accounts.NewManager(store, sm)
rhp3, err := rhp3.NewSessionHandler(rhp3Listener, hostKey, cm, s, wm, accounts, contractManager, registry, vm, sm, log.Named("rhp3"))
if err != nil {
return fmt.Errorf("failed to create rhp3 session handler: %w", err)
}
Expand All @@ -298,13 +298,12 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
apiOpts := []api.ServerOption{
api.WithAlerts(am),
api.WithLogger(log.Named("api")),
api.WithRHPSessionReporter(sr),
api.WithWebhooks(wr),
api.WithSQLite3Store(store),
}
if !cfg.Explorer.Disable {
ex := explorer.New(cfg.Explorer.URL)
pm, err := pin.NewManager(store, cfm, ex, pin.WithLogger(log.Named("pin")))
pm, err := pin.NewManager(store, sm, ex, pin.WithLogger(log.Named("pin")))
if err != nil {
return fmt.Errorf("failed to create pin manager: %w", err)
}
Expand All @@ -314,7 +313,7 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK

web := http.Server{
Handler: webRouter{
api: jape.BasicAuth(cfg.HTTP.Password)(api.NewServer(cfg.Name, hostKey.PublicKey(), cm, s, accounts, contractManager, vm, wm, store, cfm, index, apiOpts...)),
api: jape.BasicAuth(cfg.HTTP.Password)(api.NewServer(cfg.Name, hostKey.PublicKey(), cm, s, accounts, contractManager, vm, wm, store, sm, index, apiOpts...)),
ui: hostd.Handler(),
},
ReadTimeout: 30 * time.Second,
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
lukechampine.com/flagg v1.1.1
lukechampine.com/frand v1.5.1
lukechampine.com/upnp v0.3.0
nhooyr.io/websocket v1.8.17
)

require (
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,3 @@ lukechampine.com/frand v1.5.1 h1:fg0eRtdmGFIxhP5zQJzM1lFDbD6CUfu/f+7WgAZd5/w=
lukechampine.com/frand v1.5.1/go.mod h1:4VstaWc2plN4Mjr10chUD46RAVGWhpkZ5Nja8+Azp0Q=
lukechampine.com/upnp v0.3.0 h1:UVCD6eD6fmJmwak6DVE3vGN+L46Fk8edTcC6XYCb6C4=
lukechampine.com/upnp v0.3.0/go.mod h1:sOuF+fGSDKjpUm6QI0mfb82ScRrhj8bsqsD78O5nK1k=
nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y=
nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
4 changes: 2 additions & 2 deletions host/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ func (m *ConfigManager) Settings() Settings {
return m.settings
}

// BandwidthLimiters returns the rate limiters for all traffic
func (m *ConfigManager) BandwidthLimiters() (ingress, egress *rate.Limiter) {
// RHPBandwidthLimiters returns the rate limiters for all RHP traffic
func (m *ConfigManager) RHPBandwidthLimiters() (ingress, egress *rate.Limiter) {
return m.ingressLimit, m.egressLimit
}

Expand Down
82 changes: 0 additions & 82 deletions rhp/conn.go

This file was deleted.

Loading

0 comments on commit 55b9357

Please sign in to comment.