diff --git a/go.mod b/go.mod index cd31926..5d2ac3e 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( go.opentelemetry.io/otel/trace v1.14.0 go.uber.org/atomic v1.10.0 go.uber.org/zap v1.24.0 + golang.org/x/sync v0.1.0 ) require ( @@ -172,7 +173,6 @@ require ( golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect golang.org/x/mod v0.7.0 // indirect golang.org/x/net v0.7.0 // indirect - golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.6.0 // indirect golang.org/x/text v0.7.0 // indirect golang.org/x/tools v0.5.0 // indirect diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index c7222f7..8e62a4a 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -13,6 +13,8 @@ import ( "sync" "time" + "golang.org/x/sync/semaphore" + "github.com/filecoin-saturn/caboose" "github.com/ipfs/boxo/blockservice" blockstore "github.com/ipfs/boxo/blockstore" @@ -240,6 +242,14 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics { } } +var cacheLimiter = semaphore.NewWeighted(1024) +var cachePool = sync.Pool{ + New: func() any { + bs, _ := NewCacheBlockStore(1024) + return bs + }, +} + /* Implementation iteration plan: @@ -251,10 +261,7 @@ Implementation iteration plan: */ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx context.Context, path string) (gateway.IPFSBackend, func(), error) { - bstore, err := NewCacheBlockStore(1024) - if err != nil { - return nil, nil, err - } + bstore := cachePool.Get().(blockstore.Blockstore) exch := newBlockExchange(bstore, api.blockFetcher) go func(metrics *GraphGatewayMetrics) { @@ -385,6 +392,11 @@ func wrapNodeWithClose[T files.Node](node T, closeFn func()) (T, error) { } func (api *GraphGateway) Get(ctx context.Context, path gateway.ImmutablePath, byteRanges ...gateway.ByteRange) (gateway.ContentPathMetadata, *gateway.GetResponse, error) { + if err := cacheLimiter.Acquire(ctx, 1); err != nil { + return gateway.ContentPathMetadata{}, nil, err + } + defer cacheLimiter.Release(1) + rangeCount := len(byteRanges) api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": strconv.Itoa(rangeCount)}).Inc() @@ -432,6 +444,11 @@ func (api *GraphGateway) Get(ctx context.Context, path gateway.ImmutablePath, by } func (api *GraphGateway) GetAll(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.Node, error) { + if err := cacheLimiter.Acquire(ctx, 1); err != nil { + return gateway.ContentPathMetadata{}, nil, err + } + defer cacheLimiter.Release(1) + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "all", "entityRanges": "0"}).Inc() blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&dag-scope=all") if err != nil { @@ -449,6 +466,11 @@ func (api *GraphGateway) GetAll(ctx context.Context, path gateway.ImmutablePath) } func (api *GraphGateway) GetBlock(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.File, error) { + if err := cacheLimiter.Acquire(ctx, 1); err != nil { + return gateway.ContentPathMetadata{}, nil, err + } + defer cacheLimiter.Release(1) + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "block", "entityRanges": "0"}).Inc() // TODO: if path is `/ipfs/cid`, we should use ?format=raw blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&dag-scope=block") @@ -467,6 +489,11 @@ func (api *GraphGateway) GetBlock(ctx context.Context, path gateway.ImmutablePat } func (api *GraphGateway) Head(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.Node, error) { + if err := cacheLimiter.Acquire(ctx, 1); err != nil { + return gateway.ContentPathMetadata{}, nil, err + } + defer cacheLimiter.Release(1) + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": "1"}).Inc() // TODO: we probably want to move this either to boxo, or at least to loadRequestIntoSharedBlockstoreAndBlocksGateway @@ -500,6 +527,11 @@ func (api *GraphGateway) ResolvePath(ctx context.Context, path gateway.Immutable } func (api *GraphGateway) GetCAR(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, io.ReadCloser, <-chan error, error) { + if err := cacheLimiter.Acquire(ctx, 1); err != nil { + return gateway.ContentPathMetadata{}, nil, nil, err + } + defer cacheLimiter.Release(1) + api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "all", "entityRanges": "0"}).Inc() blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&dag-scope=all") if err != nil {