From f1ea37f79cea7e240a200c2cfeb3904ee4577b3e Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Mon, 22 Apr 2024 10:34:08 +0200 Subject: [PATCH] feat: remote backend --- gc.go | 7 +- handlers.go | 23 ++++- main.go | 139 +++++++++++++++++--------- setup.go | 274 ++++++++++++++++++++++++++++++++++++++-------------- 4 files changed, 320 insertions(+), 123 deletions(-) diff --git a/gc.go b/gc.go index 4e875ec..671d83a 100644 --- a/gc.go +++ b/gc.go @@ -8,8 +8,13 @@ import ( ) // GC is a really stupid simple algorithm where we just delete things until -// weve deleted enough things +// we've deleted enough things. It is no-op if the current setup does not have +// a blockstore. func (nd *Node) GC(ctx context.Context, todelete int64) error { + if nd.blockstore == nil { + return nil + } + keys, err := nd.blockstore.AllKeysChan(ctx) if err != nil { return err diff --git a/handlers.go b/handlers.go index c7d938c..9040649 100644 --- a/handlers.go +++ b/handlers.go @@ -85,12 +85,27 @@ func withRequestLogger(next http.Handler) http.Handler { } func setupGatewayHandler(cfg Config, nd *Node) (http.Handler, error) { - backend, err := gateway.NewBlocksBackend( - nd.bsrv, + var ( + backend gateway.IPFSBackend + err error + ) + + options := []gateway.BackendOption{ gateway.WithValueStore(nd.vs), gateway.WithNameSystem(nd.ns), - gateway.WithResolver(nd.resolver), - ) + gateway.WithResolver(nd.resolver), // May be nil, but that is fine. + } + + if len(cfg.RemoteBackends) > 0 && cfg.RemoteBackendMode == RemoteBackendCAR { + var fetcher gateway.CarFetcher + fetcher, err = gateway.NewRemoteCarFetcher(cfg.RemoteBackends, nil) + if err != nil { + return nil, err + } + backend, err = gateway.NewCarBackend(fetcher, options...) + } else { + backend, err = gateway.NewBlocksBackend(nd.bsrv, options...) + } if err != nil { return nil, err } diff --git a/main.go b/main.go index 0995c71..a86bcdf 100644 --- a/main.go +++ b/main.go @@ -248,6 +248,32 @@ Generate an identity seed and launch a gateway: EnvVars: []string{"RAINBOW_IPNS_MAX_CACHE_TTL"}, Usage: "Optional cap on caching duration for IPNS/DNSLink lookups. Set to 0 to respect original TTLs", }, + &cli.BoolFlag{ + Name: "bitswap", + Value: true, + EnvVars: []string{"RAINBOW_BITSWAP"}, + Usage: "Enable or disable Bitswap. Disabling Bitswap is incompatible with --peering-shared-cache", + }, + &cli.StringSliceFlag{ + Name: "remote-backends", + Value: cli.NewStringSlice(), + EnvVars: []string{"RAINBOW_REMOTE_BACKENDS"}, + Usage: "Trustless remote gateways to use as backend(comma-separated). You must set --bitswap=false to use this option", + }, + &cli.StringFlag{ + Name: "remote-backends-mode", + Value: "block", + EnvVars: []string{"RAINBOW_REMOTE_BACKENDS_MODE"}, + Usage: "Whether to fetch raw blocks or CARs from the remote backends. Options are 'block' or 'car'", + Action: func(ctx *cli.Context, s string) error { + switch RemoteBackendMode(s) { + case RemoteBackendBlock, RemoteBackendCAR: + return nil + default: + return errors.New("invalid value for --remote-backend-mode: use 'block' or 'car'") + } + }, + }, } app.Commands = []*cli.Command{ @@ -289,57 +315,66 @@ share the same seed as long as the indexes are different. var seed string var priv crypto.PrivKey + var peeringAddrs []peer.AddrInfo + var index int var err error - credDir := os.Getenv("CREDENTIALS_DIRECTORY") - secretsDir := ddir + bitswap := cctx.Bool("bitswap") + dhtRouting := DHTRouting(cctx.String("dht-routing")) + seedPeering := cctx.Bool("seed-peering") + noLibp2p := !bitswap && dhtRouting == DHTOff && !seedPeering - if len(credDir) > 0 { - secretsDir = credDir - } + // Only load secrets if we need Libp2p. + if !noLibp2p { + credDir := os.Getenv("CREDENTIALS_DIRECTORY") + secretsDir := ddir - // attempt to read seed from disk - seedBytes, err := os.ReadFile(filepath.Join(secretsDir, "seed")) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - // set seed from command line or env-var - seed = cctx.String("seed") - } else { - return fmt.Errorf("error reading seed credentials: %w", err) + if len(credDir) > 0 { + secretsDir = credDir } - } else { - seed = strings.TrimSpace(string(seedBytes)) - } - - index := cctx.Int("seed-index") - if len(seed) > 0 && index >= 0 { - fmt.Printf("Deriving identity from seed[%d]\n", index) - priv, err = deriveKey(seed, deriveKeyInfo(index)) - } else { - fmt.Println("Setting identity from libp2p.key") - keyFile := filepath.Join(secretsDir, "libp2p.key") - priv, err = loadOrInitPeerKey(keyFile) - } - if err != nil { - return err - } - var peeringAddrs []peer.AddrInfo - for _, maStr := range cctx.StringSlice("peering") { - if len(seed) > 0 && index >= 0 { - maStr, err = replaceRainbowSeedWithPeer(maStr, seed) - if err != nil { - return err + // attempt to read seed from disk + seedBytes, err := os.ReadFile(filepath.Join(secretsDir, "seed")) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // set seed from command line or env-var + seed = cctx.String("seed") + } else { + return fmt.Errorf("error reading seed credentials: %w", err) } - } else if rainbowSeedRegex.MatchString(maStr) { - return fmt.Errorf("unable to peer with %q without defining --seed-index of this instance first", maStr) + } else { + seed = strings.TrimSpace(string(seedBytes)) } - ai, err := peer.AddrInfoFromString(maStr) + index = cctx.Int("seed-index") + if len(seed) > 0 && index >= 0 { + fmt.Println("Deriving identity from seed") + priv, err = deriveKey(seed, deriveKeyInfo(index)) + } else { + fmt.Println("Setting identity from libp2p.key") + keyFile := filepath.Join(secretsDir, "libp2p.key") + priv, err = loadOrInitPeerKey(keyFile) + } if err != nil { return err } - peeringAddrs = append(peeringAddrs, *ai) + + for _, maStr := range cctx.StringSlice("peering") { + if len(seed) > 0 && index >= 0 { + maStr, err = replaceRainbowSeedWithPeer(maStr, seed) + if err != nil { + return err + } + } else if rainbowSeedRegex.MatchString(maStr) { + return fmt.Errorf("unable to peer with %q without defining --seed-index of this instance first", maStr) + } + + ai, err := peer.AddrInfoFromString(maStr) + if err != nil { + return err + } + peeringAddrs = append(peeringAddrs, *ai) + } } cfg := Config{ @@ -355,23 +390,32 @@ share the same seed as long as the indexes are different. MaxFD: cctx.Int("max-fd"), InMemBlockCache: cctx.Int64("inmem-block-cache"), RoutingV1Endpoints: cctx.StringSlice("http-routers"), - DHTRouting: DHTRouting(cctx.String("dht-routing")), + DHTRouting: dhtRouting, DHTSharedHost: cctx.Bool("dht-shared-host"), + Bitswap: bitswap, IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"), DenylistSubs: cctx.StringSlice("denylists"), Peering: peeringAddrs, PeeringCache: cctx.Bool("peering-shared-cache"), Seed: seed, SeedIndex: index, - SeedPeering: cctx.Bool("seed-peering"), + SeedPeering: seedPeering, SeedPeeringMaxIndex: cctx.Int("seed-peering-max-index"), + RemoteBackends: cctx.StringSlice("remote-backends"), + RemoteBackendMode: RemoteBackendMode(cctx.String("remote-backends-mode")), GCInterval: cctx.Duration("gc-interval"), GCThreshold: cctx.Float64("gc-threshold"), } + var gnd *Node + goLog.Debugf("Rainbow config: %+v", cfg) - gnd, err := Setup(cctx.Context, cfg, priv, cdns) + if noLibp2p { + gnd, err = SetupNoLibp2p(cctx.Context, cfg, cdns) + } else { + gnd, err = Setup(cctx.Context, cfg, priv, cdns) + } if err != nil { return err } @@ -389,11 +433,14 @@ share the same seed as long as the indexes are different. Handler: handler, } - pid, err := peer.IDFromPublicKey(priv.GetPublic()) - if err != nil { - return err + fmt.Printf("Starting %s %s\n", name, version) + if priv != nil { + pid, err := peer.IDFromPublicKey(priv.GetPublic()) + if err != nil { + return err + } + fmt.Printf("PeerID: %s\n\n", pid) } - fmt.Printf("PeerID: %s\n\n", pid) registerVersionMetric(version) registerIpfsNodeCollector(gnd) diff --git a/setup.go b/setup.go index 74870bc..feead6e 100644 --- a/setup.go +++ b/setup.go @@ -20,6 +20,7 @@ import ( bsserver "github.com/ipfs/boxo/bitswap/server" "github.com/ipfs/boxo/blockservice" "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange/offline" bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/boxo/gateway" "github.com/ipfs/boxo/ipns" @@ -71,23 +72,30 @@ const ( DHTOff DHTRouting = "off" ) +type RemoteBackendMode string + +const ( + RemoteBackendBlock RemoteBackendMode = "block" + RemoteBackendCAR RemoteBackendMode = "car" +) + func init() { // Lets us discover our own public address with a single observation identify.ActivationThresh = 1 } type Node struct { - vs routing.ValueStore - host host.Host - + ns namesys.NameSystem + vs routing.ValueStore dataDir string - datastore datastore.Batching - blockstore blockstore.Blockstore - bs *bitswap.Bitswap bsrv blockservice.BlockService - resolver resolver.Resolver - ns namesys.NameSystem denylistSubs []*nopfs.HTTPSubscriber + + // Maybe not be set depending on the configuration: + host host.Host + datastore datastore.Batching + blockstore blockstore.Blockstore + resolver resolver.Resolver } type Config struct { @@ -112,6 +120,7 @@ type Config struct { DHTRouting DHTRouting DHTSharedHost bool IpnsMaxCacheTTL time.Duration + Bitswap bool DenylistSubs []string Peering []peer.AddrInfo @@ -122,10 +131,64 @@ type Config struct { SeedPeering bool SeedPeeringMaxIndex int + RemoteBackends []string + RemoteBackendMode RemoteBackendMode + GCInterval time.Duration GCThreshold float64 } +func SetupNoLibp2p(ctx context.Context, cfg Config, dnsCache *cachedDNS) (*Node, error) { + var err error + + cfg.DataDir, err = filepath.Abs(cfg.DataDir) + if err != nil { + return nil, err + } + + denylists, blocker, err := setupDenylists(cfg) + if err != nil { + return nil, err + } + + // The stars aligned and Libp2p does not need to be turned on at all. + if len(cfg.RemoteBackends) == 0 { + return nil, errors.New("remote backends must be set when bitswap and dht are disabled") + } + + // Setup a Value Store composed of both the remote backends and the delegated + // routers, if they exist. This vs is only used for the namesystem. + vs, err := setupRoutingNoLibp2p(cfg, dnsCache) + if err != nil { + return nil, err + } + + // Setup the remote blockstore if that's the mode we're using. + var bsrv blockservice.BlockService + if cfg.RemoteBackendMode == RemoteBackendBlock { + blkst, err := gateway.NewRemoteBlockstore(cfg.RemoteBackends, nil) + if err != nil { + return nil, err + } + + bsrv = blockservice.New(blkst, offline.Exchange(blkst)) + bsrv = nopfsipfs.WrapBlockService(bsrv, blocker) + } + + ns, err := setupNamesys(cfg, vs, blocker) + if err != nil { + return nil, err + } + + return &Node{ + vs: vs, + ns: ns, + dataDir: cfg.DataDir, + denylistSubs: denylists, + bsrv: bsrv, + }, nil +} + func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cachedDNS) (*Node, error) { var err error @@ -134,11 +197,16 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached return nil, err } - ds, err := setupDatastore(cfg) + denylists, blocker, err := setupDenylists(cfg) if err != nil { return nil, err } + n := &Node{ + dataDir: cfg.DataDir, + denylistSubs: denylists, + } + bwc := metrics.NewBandwidthCounter() cmgr, err := connmgr.NewConnManager(cfg.ConnMgrLow, cfg.ConnMgrHi, connmgr.WithGracePeriod(cfg.ConnMgrGrace)) @@ -185,20 +253,15 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached })) } - blkst := blockstore.NewBlockstore(ds, - blockstore.NoPrefix(), - // Every Has() for every written block is a transaction with a - // seek onto LSM. If not in memory it will be a pain. - // We opt to write every block Put into the blockstore. - // See also comment in blockservice. - blockstore.WriteThrough(), - ) - blkst = blockstore.NewIdStore(blkst) + ds, err := setupDatastore(cfg) + if err != nil { + return nil, err + } var ( + vs routing.ValueStore cr routing.ContentRouting pr routing.PeerRouting - vs routing.ValueStore ) opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { @@ -215,74 +278,66 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached return nil, err } - bswap := setupBitswap(ctx, cfg, h, cr, blkst) + var bsrv blockservice.BlockService + if cfg.Bitswap { + blkst := blockstore.NewBlockstore(ds, + blockstore.NoPrefix(), + // Every Has() for every written block is a transaction with a + // seek onto LSM. If not in memory it will be a pain. + // We opt to write every block Put into the blockstore. + // See also comment in blockservice. + blockstore.WriteThrough(), + ) + blkst = blockstore.NewIdStore(blkst) + n.blockstore = blkst + + bsrv = blockservice.New(blkst, setupBitswap(ctx, cfg, h, cr, blkst), + // if we are doing things right, our bitswap wantlists should + // not have blocks that we already have (see + // https://github.com/ipfs/boxo/blob/e0d4b3e9b91e9904066a10278e366c9a6d9645c7/blockservice/blockservice.go#L272). Thus + // we should not be writing many blocks that we already + // have. Thus, no point in checking whether we have a block + // before writing new blocks. + blockservice.WriteThrough(), + ) + } else { + if len(cfg.RemoteBackends) == 0 || cfg.RemoteBackendMode != RemoteBackendBlock { + return nil, errors.New("remote backends in block mode must be set when disabling bitswap") + } - err = os.Mkdir(filepath.Join(cfg.DataDir, "denylists"), 0755) - if err != nil && !errors.Is(err, fs.ErrExist) { - return nil, err - } + if cfg.PeeringCache { + return nil, errors.New("disabling bitswap is incompatible with peering cache") + } - var denylists []*nopfs.HTTPSubscriber - for _, dl := range cfg.DenylistSubs { - s, err := nopfs.NewHTTPSubscriber(dl, filepath.Join(cfg.DataDir, "denylists", filepath.Base(dl)), time.Minute) + blkst, err := gateway.NewRemoteBlockstore(cfg.RemoteBackends, nil) if err != nil { return nil, err } - denylists = append(denylists, s) - } - files, err := nopfs.GetDenylistFilesInDir(filepath.Join(cfg.DataDir, "denylists")) - if err != nil { - return nil, err + bsrv = blockservice.New(blkst, offline.Exchange(blkst)) } - blocker, err := nopfs.NewBlocker(files) - if err != nil { - return nil, err - } - - bsrv := blockservice.New(blkst, bswap, - // if we are doing things right, our bitswap wantlists should - // not have blocks that we already have (see - // https://github.com/ipfs/boxo/blob/e0d4b3e9b91e9904066a10278e366c9a6d9645c7/blockservice/blockservice.go#L272). Thus - // we should not be writing many blocks that we already - // have. Thus, no point in checking whether we have a block - // before writing new blocks. - blockservice.WriteThrough(), - ) bsrv = nopfsipfs.WrapBlockService(bsrv, blocker) - dns, err := gateway.NewDNSResolver(nil) - if err != nil { - return nil, err - } - nsOptions := []namesys.Option{namesys.WithDNSResolver(dns)} - if cfg.IpnsMaxCacheTTL > 0 { - nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL)) - } - ns, err := namesys.NewNameSystem(vs, nsOptions...) - if err != nil { - return nil, err - } - ns = nopfsipfs.WrapNameSystem(ns, blocker) - fetcherCfg := bsfetcher.NewFetcherConfig(bsrv) fetcherCfg.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser) fetcher := fetcherCfg.WithReifier(unixfsnode.Reify) r := resolver.NewBasicResolver(fetcher) r = nopfsipfs.WrapResolver(r, blocker) - return &Node{ - host: h, - blockstore: blkst, - dataDir: cfg.DataDir, - datastore: ds, - bs: bswap, - ns: ns, - vs: vs, - bsrv: bsrv, - resolver: r, - denylistSubs: denylists, - }, nil + n.host = h + n.datastore = ds + n.bsrv = bsrv + n.resolver = r + + ns, err := setupNamesys(cfg, vs, blocker) + if err != nil { + return nil, err + } + + n.vs = vs + n.ns = ns + + return n, nil } func setupDatastore(cfg Config) (datastore.Batching, error) { @@ -496,6 +551,18 @@ func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Bat vs routing.ValueStore = router ) + // If we're using a remote backend, but we also have libp2p enabled (e.g. for + // seed peering), we can still leverage the remote backend here. + if len(cfg.RemoteBackends) > 0 && cfg.RemoteBackendMode == RemoteBackendBlock { + remoteValueStore, err := gateway.NewRemoteValueStore(cfg.RemoteBackends, nil) + if err != nil { + return nil, nil, nil, err + } + vs = setupCompositeRouting(append(delegatedRouters, &routinghelpers.Compose{ + ValueStore: remoteValueStore, + }), dhtRouter) + } + // If we're using seed peering, we need to run a lighter Amino DHT for the // peering routing. We need to run a separate DHT with the main host if // the shared host is disabled, or if we're not running any DHT at all. @@ -513,6 +580,25 @@ func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Bat return cr, pr, vs, nil } +func setupRoutingNoLibp2p(cfg Config, dnsCache *cachedDNS) (routing.ValueStore, error) { + delegatedRouters, err := setupDelegatedRouting(cfg, dnsCache) + if err != nil { + return nil, err + } + + if len(cfg.RemoteBackends) > 0 && cfg.RemoteBackendMode == RemoteBackendBlock { + remoteValueStore, err := gateway.NewRemoteValueStore(cfg.RemoteBackends, nil) + if err != nil { + return nil, err + } + delegatedRouters = append(delegatedRouters, &routinghelpers.Compose{ + ValueStore: remoteValueStore, + }) + } + + return setupCompositeRouting(delegatedRouters, nil), nil +} + type bundledDHT struct { standard *dht.IpfsDHT fullRT *fullrt.FullRT @@ -691,3 +777,47 @@ func setupBitswap(ctx context.Context, cfg Config, h host.Host, cr routing.Conte return bswap } + +func setupDenylists(cfg Config) ([]*nopfs.HTTPSubscriber, *nopfs.Blocker, error) { + err := os.Mkdir(filepath.Join(cfg.DataDir, "denylists"), 0755) + if err != nil && !errors.Is(err, fs.ErrExist) { + return nil, nil, err + } + + var denylists []*nopfs.HTTPSubscriber + for _, dl := range cfg.DenylistSubs { + s, err := nopfs.NewHTTPSubscriber(dl, filepath.Join(cfg.DataDir, "denylists", filepath.Base(dl)), time.Minute) + if err != nil { + return nil, nil, err + } + denylists = append(denylists, s) + } + + files, err := nopfs.GetDenylistFilesInDir(filepath.Join(cfg.DataDir, "denylists")) + if err != nil { + return nil, nil, err + } + blocker, err := nopfs.NewBlocker(files) + if err != nil { + return nil, nil, err + } + + return denylists, blocker, nil +} + +func setupNamesys(cfg Config, vs routing.ValueStore, blocker *nopfs.Blocker) (namesys.NameSystem, error) { + dns, err := gateway.NewDNSResolver(nil) + if err != nil { + return nil, err + } + nsOptions := []namesys.Option{namesys.WithDNSResolver(dns)} + if cfg.IpnsMaxCacheTTL > 0 { + nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL)) + } + ns, err := namesys.NewNameSystem(vs, nsOptions...) + if err != nil { + return nil, err + } + ns = nopfsipfs.WrapNameSystem(ns, blocker) + return ns, nil +}