Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge branch 'aa/test-simulator' into feat/port-Caboose-main
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 authored Sep 20, 2023
2 parents 3f63a01 + 552ea1b commit da9ad17
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 5 deletions.
9 changes: 5 additions & 4 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error
ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()


if err == nil {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: tid,
Expand All @@ -236,7 +237,7 @@ func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return false, err
}
Expand All @@ -247,7 +248,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -259,14 +260,14 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
func (c *Caboose) GetAffinity(ctx context.Context) string {
// https://github.com/ipfs/bifrost-gateway/issues/53#issuecomment-1442732865
if affG := ctx.Value(gateway.ContentPathKey); affG != nil {
contentPath := affG.(ipath.Path).String()
Expand Down
2 changes: 2 additions & 0 deletions internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch.CabooseActiveNodes = conf.Harness.ActiveNodes.(*caboose.NodeRing)
ch.CabooseAllNodes = conf.Harness.AllNodes.(*caboose.NodeHeap)
ch.CaboosePool = conf.Harness.PoolController
ch.Config = conf
return ch
}

Expand All @@ -102,6 +103,7 @@ type CabooseHarness struct {
CabooseActiveNodes *caboose.NodeRing
CabooseAllNodes *caboose.NodeHeap
CaboosePool state.PoolController
Config *caboose.Config

gol sync.Mutex
goodOrch bool
Expand Down
104 changes: 103 additions & 1 deletion pool_dynamics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package caboose_test

import (
"context"
cryptoRand "crypto/rand"
"fmt"
"math/rand"
"net/url"
"testing"
Expand All @@ -17,6 +19,7 @@ import (
const (
nodesSize = 6
)
const blockPathPattern = "/ipfs/%s?format=car&dag-scope=block"

/*
This function tests if the caboose pool converges to a set of nodes that are expected
Expand Down Expand Up @@ -209,9 +212,93 @@ func TestPoolDynamics(t *testing.T) {

}

func TestPoolAffinity(t *testing.T) {
baseStatSize := 100000
baseStatLatency := 100
// statVarianceFactor := 0.1
poolRefreshNo := 10
simReqCount := 10000
ctx := context.Background()
cidList := generateRandomCIDs(20)

t.Run("selected nodes remain consistent for same cid reqs", func(t *testing.T) {
// 80 nodes will be in the good pool. 20 will be added later with the same stats.
// So, 20% of the nodes in the pool will eventually be "new nodes" that have been added later.
ch, controlGroup := getHarnessAndControlGroup(t, 100, 80)
_, _ = ch.Caboose.Get(ctx, cidList[0])

existingNodes := make([]*caboose.Node, 0)
newNodes := make([]*caboose.Node, 0)

for _, n := range ch.CabooseAllNodes.Nodes {
_, ok := controlGroup[n.URL]
if ok {
existingNodes = append(existingNodes, n)
} else {
newNodes = append(newNodes, n)
}
}

// Send requests to control group nodes to bump their selection into the pool.
for i := 0; i < poolRefreshNo; i++ {
baseStats := util.NodeStats{
Start: time.Now().Add(-time.Second * 2),
Latency: float64(baseStatLatency) / float64(10),
Size: float64(baseStatSize) * float64(10),
}

ch.RecordSuccesses(t, existingNodes, baseStats, 1000)
ch.CaboosePool.DoRefresh()
}

// Make a bunch of requests to similar cids to establish a stable hashring
for i := 0; i < simReqCount; i++ {
rand.New(rand.NewSource(time.Now().Unix()))
idx := rand.Intn(len(cidList))
_, _ = ch.Caboose.Get(ctx, cidList[idx])
}
ch.CaboosePool.DoRefresh()

// Introduce new nodes by sendng same stats to those nodes.
for i := 0; i < poolRefreshNo/2; i++ {
baseStats := util.NodeStats{
Start: time.Now().Add(-time.Second * 2),
Latency: float64(baseStatLatency) / float64(10),
Size: float64(baseStatSize) * float64(10),
}

ch.RecordSuccesses(t, existingNodes, baseStats, 100)
ch.RecordSuccesses(t, newNodes, baseStats, 10)

ch.CaboosePool.DoRefresh()
}

rerouteCount := 0

// Get the candidate nodes for each cid in the cid list to see if it's been rerouted to a new node.
for _, c := range cidList {
aff := ch.Caboose.GetAffinity(ctx)
if aff == "" {
aff = fmt.Sprintf(blockPathPattern, c)
}
nodes, _ := ch.CabooseActiveNodes.GetNodes(aff, ch.Config.MaxRetrievalAttempts)

for _, n := range newNodes {
n := n
if n.URL == nodes[0].URL {
rerouteCount++
}
}
}

// no more than 5 cids from the cid list of 20 should get re-routed (25%)
assert.LessOrEqual(t, rerouteCount, 5)
})
}

func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util.CabooseHarness, map[string]string) {
ch := util.BuildCabooseHarness(t, nodesSize, 3, func(config *caboose.Config) {
config.PoolTargetSize = 3
config.PoolTargetSize = nodesSize / 2
})

ch.StartOrchestrator()
Expand All @@ -231,3 +318,18 @@ func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util

return ch, controlGroup
}

func generateRandomCIDs(count int) []cid.Cid {
var cids []cid.Cid
for i := 0; i < count; i++ {
block := make([]byte, 32)
cryptoRand.Read(block)
c, _ := cid.V1Builder{
Codec: uint64(multicodec.Raw),
MhType: uint64(multicodec.Sha2_256),
}.Sum(block)

cids = append(cids, c)
}
return cids
}

0 comments on commit da9ad17

Please sign in to comment.