Skip to content

Commit

Permalink
feat!: independent DHT and Routing V1 options
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Apr 8, 2024
1 parent 7fcc50e commit 2a02d63
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gateway-conformance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
kuboNodeMultiaddr=$(ipfs --api=/ip4/127.0.0.1/tcp/5001 swarm addrs local --id | head -n 1)
# run gw
./rainbow --routing=http://127.0.0.1:8080 --peering=$kuboNodeMultiaddr &
./rainbow --routing-v1-endpoints=http://127.0.0.1:8080 --dht-routing=off --peering=$kuboNodeMultiaddr &
working-directory: rainbow

# 6. Run the gateway-conformance tests
Expand Down
2 changes: 2 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
func mustTestNode(t *testing.T, cfg Config) *Node {
cfg.DataDir = t.TempDir()
cfg.BlockstoreType = "flatfs"
cfg.DHTRouting = DHTStandard
cfg.RoutingV1Endpoints = []string{cidContactEndpoint}

ctx := context.Background()

Expand Down
24 changes: 20 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,25 @@ Generate an identity seed and launch a gateway:
EnvVars: []string{"RAINBOW_MAX_FD"},
Usage: "Maximum number of file descriptors. Defaults to 50% of the process' limit",
},
&cli.StringSliceFlag{
Name: "routing-v1-endpoints",
Value: cli.NewStringSlice(cidContactEndpoint),
EnvVars: []string{"RAINBOW_ROUTING_V1_ENDPOINTS"},
Usage: "Routing V1 endpoints to use for routing (comma-separated)",
},

Check warning on line 175 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L170-L175

Added lines #L170 - L175 were not covered by tests
&cli.StringFlag{
Name: "routing",
Value: "",
Usage: "RoutingV1 Endpoint (otherwise Amino DHT and cid.contact is used)",
Name: "dht-routing",
Value: "accelerated",
EnvVars: []string{"RAINBOW_DHT_ROUTING"},
Usage: "Use the Amino DHT for routing. Options are 'accelerated', 'standard' and 'off'",
Action: func(ctx *cli.Context, s string) error {
switch DHTRouting(s) {
case DHTAccelerated, DHTStandard, DHTOff:
return nil
default:
return errors.New("invalid value for --dht-routing: use 'accelerated', 'standard' or 'off'")

Check warning on line 186 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L177-L186

Added lines #L177 - L186 were not covered by tests
}
},
},
&cli.BoolFlag{
Name: "dht-shared-host",
Expand Down Expand Up @@ -297,7 +312,8 @@ share the same seed as long as the indexes are different.
MaxMemory: cctx.Uint64("max-memory"),
MaxFD: cctx.Int("max-fd"),
InMemBlockCache: cctx.Int64("inmem-block-cache"),
RoutingV1: cctx.String("routing"),
RoutingV1Endpoints: cctx.StringSlice("routing-v1-endpoints"),
DHTRouting: DHTRouting(cctx.String("dht-routing")),

Check warning on line 316 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L315-L316

Added lines #L315 - L316 were not covered by tests
DHTSharedHost: cctx.Bool("dht-shared-host"),
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: cctx.StringSlice("denylists"),

Check warning on line 319 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L319

Added line #L319 was not covered by tests
Expand Down
114 changes: 66 additions & 48 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ func init() {
}
}

const ipniFallbackEndpoint = "https://cid.contact"
const cidContactEndpoint = "https://cid.contact"

type DHTRouting string

const (
DHTAccelerated DHTRouting = "accelerated"
DHTStandard DHTRouting = "standard"
DHTOff DHTRouting = "off"
)

type Node struct {
vs routing.ValueStore
Expand Down Expand Up @@ -97,7 +105,8 @@ type Config struct {
GatewayDomains []string
SubdomainGatewayDomains []string
TrustlessGatewayDomains []string
RoutingV1 string
RoutingV1Endpoints []string
DHTRouting DHTRouting
DHTSharedHost bool
IpnsMaxCacheTTL time.Duration

Expand Down Expand Up @@ -176,9 +185,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
)
blkst = blockstore.NewIdStore(blkst)

var pr routing.PeerRouting
var vs routing.ValueStore
var cr routing.ContentRouting
var router routing.Routing

// Increase per-host connection pool since we are making lots of concurrent requests.
httpClient := &http.Client{
Expand All @@ -201,17 +208,21 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
if cfg.RoutingV1 != "" {
routingClient, err := delegatedHTTPContentRouter(cfg.RoutingV1, routingv1client.WithStreamResultsRequired(), routingv1client.WithHTTPClient(httpClient))
var routingV1Routers []routing.Routing
for _, endpoint := range cfg.RoutingV1Endpoints {
rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)}
if endpoint != cidContactEndpoint {
rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired())

Check warning on line 215 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L215

Added line #L215 was not covered by tests
}
httpClient, err := delegatedHTTPContentRouter(endpoint, rv1Opts...)
if err != nil {
return nil, err
}
pr = routingClient
vs = routingClient
cr = routingClient
} else {
// If there are no delegated routing endpoints run an accelerated Amino DHT client and send IPNI requests to cid.contact
routingV1Routers = append(routingV1Routers, httpClient)
}

var dhtRouter routing.Routing
if cfg.DHTRouting != DHTOff {
var dhtHost host.Host
if cfg.DHTSharedHost {
dhtHost = h
Expand All @@ -237,54 +248,61 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
dht.Datastore(ds),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
if cfg.DHTRouting == DHTAccelerated {
fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
dht.Datastore(ds),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err

Check warning on line 263 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L252-L263

Added lines #L252 - L263 were not covered by tests
}
dhtRouter = &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,

Check warning on line 267 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L265-L267

Added lines #L265 - L267 were not covered by tests
}
} else {
dhtRouter = standardClient
}
}

dhtRouter := &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,
}
if len(routingV1Routers) == 0 && dhtRouter == nil {
return nil, errors.New("no routers available")

Check warning on line 275 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L275

Added line #L275 was not covered by tests
}

// we want to also use the default HTTP routers, so wrap the FullRT client
// in a parallel router that calls them in parallel
httpRouters, err := delegatedHTTPContentRouter(ipniFallbackEndpoint, routingv1client.WithHTTPClient(httpClient))
if err != nil {
return nil, err
}
routers := []*routinghelpers.ParallelRouter{
{
if len(routingV1Routers) == 0 {
router = dhtRouter

Check warning on line 279 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L279

Added line #L279 was not covered by tests
} else {
var routers []*routinghelpers.ParallelRouter

if dhtRouter != nil {
routers = append(routers, &routinghelpers.ParallelRouter{
Router: dhtRouter,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: false,
},
{
})
}

for _, routingV1Router := range routingV1Routers {
routers = append(routers, &routinghelpers.ParallelRouter{
Timeout: 15 * time.Second,
Router: httpRouters,
Router: routingV1Router,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: true,
},
})
}
router := routinghelpers.NewComposableParallel(routers)

pr = router
vs = router
cr = router
router = routinghelpers.NewComposableParallel(routers)
}

return pr, nil
return router, nil
}))
h, err := libp2p.New(opts...)
if err != nil {
Expand All @@ -302,7 +320,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)
bn := bsnet.NewFromIpfsHost(h, router)
bswap := bsclient.New(bsctx, bn, blkst,
// default is 1 minute to search for a random live-want (1
// CID). I think we want to search for random live-wants more
Expand Down Expand Up @@ -357,7 +375,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
if cfg.IpnsMaxCacheTTL > 0 {
nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL))
}
ns, err := namesys.NewNameSystem(vs, nsOptions...)
ns, err := namesys.NewNameSystem(router, nsOptions...)
if err != nil {
return nil, err
}
Expand All @@ -376,7 +394,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
datastore: ds,
bsClient: bswap,
ns: ns,
vs: vs,
vs: router,
bsrv: bsrv,
resolver: r,
bwc: bwc,
Expand Down

0 comments on commit 2a02d63

Please sign in to comment.