Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #50 from ipfs/test-bs
Browse files Browse the repository at this point in the history
feat: add proxy blockstore for testing against non-saturn backend
  • Loading branch information
lidel authored Feb 24, 2023
2 parents 1c55d71 + f78b4ef commit c305b3b
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 101 deletions.
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,43 @@ Local build:

```console
$ go build
$ ./bifrost-gateway --help
$ ./bifrost-gateway
```

Prebuild Docker image:
### Docker

```console
$ docker pull ipfs/bifrost-gateway:main-latest
$ docker run --rm -it --net=host ipfs/bifrost-gateway:main-latest --help
$ docker run --rm -it --net=host ipfs/bifrost-gateway:main-latest
```

When using Docker, make sure to pass necessary config via [`./docs/environment-variables.md`](./docs/environment-variables.md).

List of available revisions: https://hub.docker.com/r/ipfs/bifrost-gateway/tags

### How to run with Saturn CDN backend

[Saturn](https://strn.network) is an open-source, community-run Content Delivery Network (CDN).
`bifrost-gateway` supports it via the [Caboose](https://github.com/filecoin-saturn/caboose) backend,
which takes care of discovering and evaluating Saturn CDN peers.

See [_Saturn Backend_ in `./docs/environment-variables.md`](./docs/environment-variables.md#saturn-backend)

### How to run with local gateway

Saturn is implementation detail specific to ipfs.io infrastructure.
One can run `bifrost-gateway` without it. All you need is endpoint that supports
[verifiable response types](https://docs.ipfs.tech/reference/http/gateway/#trustless-verifiable-retrieval).

To run without Saturn and use Gateway provided by a local IPFS node like [Kubo](https://github.com/ipfs/kubo):

```console
$ PROXY_GATEWAY_URL="http://127.0.0.1:8080" ./bifrost-gateway
```

See [_Proxy Backend_ in `./docs/environment-variables.md`](./docs/environment-variables.md#proxy-backend)


### How to debug?

See [`GOLOG_LOG_LEVEL`](./docs/environment-variables.md#golog_log_level).
Expand Down
42 changes: 34 additions & 8 deletions blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ package main

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/filecoin-saturn/caboose"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
blocks "github.com/ipfs/go-libipfs/blocks"
"github.com/ipfs/go-libipfs/gateway"
"go.uber.org/zap/zapcore"
)

var errNotImplemented = errors.New("not implemented")

const GetBlockTimeout = time.Second * 60

func newExchange(orchestrator, loggingEndpoint string, cdns *cachedDNS) (exchange.Interface, error) {
b, err := newCabooseBlockStore(orchestrator, loggingEndpoint, cdns)
if err != nil {
return nil, err
}
return &exchangeBsWrapper{bstore: b}, nil
func newExchange(bs blockstore.Blockstore) (exchange.Interface, error) {
return &exchangeBsWrapper{bstore: bs}, nil
}

type exchangeBsWrapper struct {
Expand All @@ -30,10 +33,14 @@ func (e *exchangeBsWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Blo
defer cancel()

if goLog.Level().Enabled(zapcore.DebugLevel) {
goLog.Debugw("block requested from strn", "cid", c.String())
goLog.Debugw("block requested from remote blockstore", "cid", c.String())
}

return e.bstore.Get(ctx, c)
blk, err := e.bstore.Get(ctx, c)
if err != nil {
return nil, gatewayError(err)
}
return blk, nil
}

func (e *exchangeBsWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
Expand All @@ -60,4 +67,23 @@ func (e *exchangeBsWrapper) Close() error {
return nil
}

// gatewayError translates underlying blockstore error into one that gateway code will return as HTTP 502 or 504
func gatewayError(err error) error {
if errors.Is(err, gateway.ErrGatewayTimeout) || errors.Is(err, gateway.ErrBadGateway) {
// already correct error
return err
}

// All timeouts should produce 504 Gateway Timeout
if errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, caboose.ErrSaturnTimeout) ||
// Unfortunately this is not an exported type so we have to check for the content.
strings.Contains(err.Error(), "Client.Timeout exceeded") {
return fmt.Errorf("%w: %s", gateway.ErrGatewayTimeout, err.Error())
}

// everything else returns 502 Bad Gateway
return fmt.Errorf("%w: %s", gateway.ErrBadGateway, err.Error())
}

var _ exchange.Interface = (*exchangeBsWrapper)(nil)
2 changes: 2 additions & 0 deletions blockstore_caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

const DefaultSaturnLogger = "http://set-STRN_LOGGER_URL"

func newCabooseBlockStore(orchestrator, loggingEndpoint string, cdns *cachedDNS) (blockstore.Blockstore, error) {
var (
orchURL *url.URL
Expand Down
3 changes: 1 addition & 2 deletions blockstore_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"

"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -127,7 +126,7 @@ func (l *cacheBlockStore) PutMany(ctx context.Context, blks []blocks.Block) erro
}

func (l *cacheBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
return nil, errNotImplemented
}

func (l *cacheBlockStore) HashOnRead(enabled bool) {
Expand Down
154 changes: 154 additions & 0 deletions blockstore_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"context"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/url"
"time"

"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-libipfs/blocks"
)

// Blockstore backed by a verifiable gateway. This is vendor-agnostic proxy interface,
// one can use Gateway provided by Kubo, or any other implementation that follows
// the spec for verifiable responses:
// https://docs.ipfs.tech/reference/http/gateway/#trustless-verifiable-retrieval
// https://github.com/ipfs/specs/blob/main/http-gateways/TRUSTLESS_GATEWAY.md

const (
DefaultProxyGateway = "http://127.0.0.1:8080"
DefaultKuboPRC = "http://127.0.0.1:5001"
)

type proxyBlockStore struct {
httpClient *http.Client
gatewayURL []string
validate bool
rand *rand.Rand
}

func newProxyBlockStore(gatewayURL []string, cdns *cachedDNS) blockstore.Blockstore {
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)

if len(gatewayURL) == 0 {
log.Fatal("Missing PROXY_GATEWAY_URL. See https://github.com/ipfs/bifrost-gateway/blob/main/docs/environment-variables.md")
}

return &proxyBlockStore{
gatewayURL: gatewayURL,
httpClient: &http.Client{
Timeout: GetBlockTimeout,
Transport: &withUserAgent{
// Roundtripper with increased defaults than http.Transport such that retrieving
// multiple blocks from a single gateway concurrently is fast.
RoundTripper: &http.Transport{
MaxIdleConns: 1000,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DialContext: cdns.dialWithCachedDNS,
},
},
},
// Enables block validation by default. Important since we are
// proxying block requests to an untrusted gateway.
validate: true,
rand: rand,
}
}

func (ps *proxyBlockStore) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) {
u, err := url.Parse(fmt.Sprintf("%s/ipfs/%s?format=raw", ps.getRandomGatewayURL(), c))
if err != nil {
return nil, err
}
resp, err := ps.httpClient.Do(&http.Request{
Method: http.MethodGet,
URL: u,
Header: http.Header{
"Accept": []string{"application/vnd.ipld.raw"},
},
})
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error from block gateway: %s", resp.Status)
}

rb, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if ps.validate {
nc, err := c.Prefix().Sum(rb)
if err != nil {
return nil, blocks.ErrWrongHash
}
if !nc.Equals(c) {
return nil, blocks.ErrWrongHash
}
}

return blocks.NewBlockWithCid(rb, c)
}

func (ps *proxyBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return false, err
}
return blk != nil, nil
}

func (ps *proxyBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return nil, err
}
return blk, nil
}

func (ps *proxyBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (ps *proxyBlockStore) HashOnRead(enabled bool) {
ps.validate = enabled
}

func (c *proxyBlockStore) Put(context.Context, blocks.Block) error {
return errNotImplemented
}

func (c *proxyBlockStore) PutMany(context.Context, []blocks.Block) error {
return errNotImplemented
}

func (c *proxyBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errNotImplemented
}

func (c *proxyBlockStore) DeleteBlock(context.Context, cid.Cid) error {
return errNotImplemented
}

func (ps *proxyBlockStore) getRandomGatewayURL() string {
return ps.gatewayURL[ps.rand.Intn(len(ps.gatewayURL))]
}

var _ blockstore.Blockstore = (*proxyBlockStore)(nil)
8 changes: 6 additions & 2 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [`KUBO_RPC_URL`](#kubo_rpc_url)
- [`BLOCK_CACHE_SIZE`](#block_cache_size)
- [Proxy Backend](#proxy-backend)
- [`PROXY_GATEWAY_URL`](#proxy_gateway_url)
- [Saturn Backend](#saturn-backend)
- [`STRN_ORCHESTRATOR_URL`](#strn_orchestrator_url)
- [`STRN_LOGGER_URL`](#strn_logger_url)
Expand All @@ -32,8 +33,11 @@ The size of in-memory [2Q cache](https://pkg.go.dev/github.com/hashicorp/golang-

## Proxy Backend

TODO: this will be the default backend used when `STRN_ORCHESTRATOR_URL` is not set.
We will have env variable that allows customizing URL of HTTP Gateway that supports Block/CAR responses.
### `PROXY_GATEWAY_URL`

Single URL or a comma separated list of Gateway endpoints that support `?format=block|car`
responses. This is used by default with `http://127.0.0.1:8080` unless `STRN_ORCHESTRATOR_URL`
is set.

## Saturn Backend

Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ipfs/bifrost-gateway
go 1.19

require (
github.com/filecoin-saturn/caboose v0.0.0-20230224143717-684691e6737d
github.com/filecoin-saturn/caboose v0.0.0-20230224175506-a1f20de5ba9e
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/ipfs/go-blockservice v0.5.0
Expand All @@ -13,11 +13,11 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-ipns v0.3.0
github.com/ipfs/go-libipfs v0.6.1-0.20230224134131-7ba1df55d53b
github.com/ipfs/go-libipfs v0.6.1-0.20230224152609-00e024995173
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.9.0
github.com/ipfs/go-namesys v0.7.0
github.com/ipfs/go-path v0.3.0
github.com/ipfs/go-path v0.3.1
github.com/ipfs/go-unixfs v0.3.1
github.com/ipfs/go-unixfsnode v1.5.1
github.com/ipfs/interface-go-ipfs-core v0.11.1
Expand Down Expand Up @@ -92,6 +92,7 @@ require (
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20211123151946-c2389c3cb60a // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
Expand Down
Loading

0 comments on commit c305b3b

Please sign in to comment.