From c1ab0e9137b28ce8772a41b461d51159fa973c37 Mon Sep 17 00:00:00 2001 From: ameanasad Date: Mon, 18 Sep 2023 21:00:20 -0400 Subject: [PATCH 1/7] enhancement: increase test size --- pool_dynamics_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index eca678e..d34bf90 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -15,7 +15,7 @@ import ( ) const ( - nodesSize = 6 + nodesSize = 200 ) /* @@ -211,7 +211,7 @@ func TestPoolDynamics(t *testing.T) { func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util.CabooseHarness, map[string]string) { ch := util.BuildCabooseHarness(t, nodesSize, 3, func(config *caboose.Config) { - config.PoolTargetSize = 3 + config.PoolTargetSize = nodesSize / 2 }) ch.StartOrchestrator() From 1975f49d26b9a2a28234d4e40aa71118acc0c060 Mon Sep 17 00:00:00 2001 From: ameanasad Date: Mon, 18 Sep 2023 23:31:02 -0400 Subject: [PATCH 2/7] feat: Add tests for affinity --- caboose.go | 10 ++-- internal/util/harness.go | 2 + pool_dynamics_test.go | 108 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 6 deletions(-) diff --git a/caboose.go b/caboose.go index de549b0..3724081 100644 --- a/caboose.go +++ b/caboose.go @@ -195,14 +195,14 @@ func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path))) defer span.End() - return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx)) + return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx)) } func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) { ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it))) defer span.End() - blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx)) + blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx)) if err != nil { return false, err } @@ -213,7 +213,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) { ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it))) defer span.End() - blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx)) + blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx)) if err != nil { return nil, err } @@ -225,14 +225,14 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) { ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it))) defer span.End() - blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx)) + blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx)) if err != nil { return 0, err } return len(blk.RawData()), nil } -func (c *Caboose) getAffinity(ctx context.Context) string { +func (c *Caboose) GetAffinity(ctx context.Context) string { // https://github.com/ipfs/bifrost-gateway/issues/53#issuecomment-1442732865 if affG := ctx.Value(gateway.ContentPathKey); affG != nil { contentPath := affG.(ipath.Path).String() diff --git a/internal/util/harness.go b/internal/util/harness.go index 077e2ea..4b2fc6b 100644 --- a/internal/util/harness.go +++ b/internal/util/harness.go @@ -79,6 +79,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt ch.CabooseActiveNodes = conf.Harness.ActiveNodes.(*caboose.NodeRing) ch.CabooseAllNodes = conf.Harness.AllNodes.(*caboose.NodeHeap) ch.CaboosePool = conf.Harness.PoolController + ch.Config = conf return ch } @@ -89,6 +90,7 @@ type CabooseHarness struct { CabooseActiveNodes *caboose.NodeRing CabooseAllNodes *caboose.NodeHeap CaboosePool state.PoolController + Config *caboose.Config gol sync.Mutex goodOrch bool diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index d34bf90..8294112 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -2,6 +2,8 @@ package caboose_test import ( "context" + cryptoRand "crypto/rand" + "fmt" "math/rand" "net/url" "testing" @@ -15,8 +17,9 @@ import ( ) const ( - nodesSize = 200 + nodesSize = 6 ) +const blockPathPattern = "/ipfs/%s?format=car&dag-scope=block" /* This function tests if the caboose pool converges to a set of nodes that are expected @@ -209,6 +212,94 @@ func TestPoolDynamics(t *testing.T) { } +func TestPoolAffinity(t *testing.T) { + baseStatSize := 100000 + baseStatLatency := 100 + // statVarianceFactor := 0.1 + poolRefreshNo := 10 + simReqCount := 10000 + ctx := context.Background() + cidList := generateRandomCIDs(20) + + t.Run("selected nodes remain consistent for same cid reqs", func(t *testing.T) { + ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + _, _ = ch.Caboose.Get(ctx, cidList[0]) + + goodNodes := make([]*caboose.Node, 0) + badNodes := make([]*caboose.Node, 0) + + for _, n := range ch.CabooseAllNodes.Nodes { + _, ok := controlGroup[n.URL] + if ok { + goodNodes = append(goodNodes, n) + } else { + badNodes = append(badNodes, n) + } + } + + // Send requests to control group nodes to bump their selection into the pool. + for i := 0; i < poolRefreshNo; i++ { + baseStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) / float64(10), + Size: float64(baseStatSize) * float64(10), + } + + ch.RecordSuccesses(t, goodNodes, baseStats, 1000) + ch.CaboosePool.DoRefresh() + } + + // Make a bunch of requests to similar cids to establish a stable hashring + for i := 0; i < simReqCount; i++ { + rand.New(rand.NewSource(time.Now().Unix())) + idx := rand.Intn(len(cidList)) + _, _ = ch.Caboose.Get(ctx, cidList[idx]) + } + ch.CaboosePool.DoRefresh() + + // Introduce new nodes by sendng same stats to those nodes. + for i := 0; i < poolRefreshNo/2; i++ { + baseStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) / float64(10), + Size: float64(baseStatSize) * float64(10), + } + + // variedStats := util.NodeStats{ + // Start: time.Now().Add(-time.Second * 2), + // Latency: float64(baseStatLatency) / (float64(10) + (1 + statVarianceFactor)), + // Size: float64(baseStatSize) * float64(10) * (1 + statVarianceFactor), + // } + + ch.RecordSuccesses(t, goodNodes, baseStats, 100) + ch.RecordSuccesses(t, badNodes, baseStats, 10) + + ch.CaboosePool.DoRefresh() + } + + // for _, i := range ch.CabooseAllNodes.Nodes { + // fmt.Println(i.URL, i.Priority(), i.PredictedLatency) + // } + + // Get the candidate nodes for a few cids from our formed cid list using + // the affinity of each cid. + for i := 0; i < 10; i++ { + rand.New(rand.NewSource(time.Now().Unix())) + idx := rand.Intn(len(cidList)) + c := cidList[idx] + aff := ch.Caboose.GetAffinity(ctx) + if aff == "" { + aff = fmt.Sprintf(blockPathPattern, c) + } + nodes, _ := ch.CabooseActiveNodes.GetNodes(aff, ch.Config.MaxRetrievalAttempts) + + // We expect that the candidate nodes are part of the "good nodes" list. + assert.Contains(t, goodNodes, nodes[0]) + } + + }) +} + func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util.CabooseHarness, map[string]string) { ch := util.BuildCabooseHarness(t, nodesSize, 3, func(config *caboose.Config) { config.PoolTargetSize = nodesSize / 2 @@ -231,3 +322,18 @@ func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util return ch, controlGroup } + +func generateRandomCIDs(count int) []cid.Cid { + var cids []cid.Cid + for i := 0; i < count; i++ { + block := make([]byte, 32) + cryptoRand.Read(block) + c, _ := cid.V1Builder{ + Codec: uint64(multicodec.Raw), + MhType: uint64(multicodec.Sha2_256), + }.Sum(block) + + cids = append(cids, c) + } + return cids +} From 78f3490c747dd58b7c5f1189a98ffdee43edf67f Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 19 Sep 2023 14:39:10 +0400 Subject: [PATCH 3/7] test cache affinity --- pool_dynamics_test.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index 8294112..79de9f3 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -222,7 +222,9 @@ func TestPoolAffinity(t *testing.T) { cidList := generateRandomCIDs(20) t.Run("selected nodes remain consistent for same cid reqs", func(t *testing.T) { - ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + // 80 nodes will be in the good pool. 20 will be added later with the same stats + // so 20% of the nodes in the pool will eventually be "new nodes" that will be added later. + ch, controlGroup := getHarnessAndControlGroup(t, 100, 80) _, _ = ch.Caboose.Get(ctx, cidList[0]) goodNodes := make([]*caboose.Node, 0) @@ -277,26 +279,31 @@ func TestPoolAffinity(t *testing.T) { ch.CaboosePool.DoRefresh() } + rerouteCount := 0 + // for _, i := range ch.CabooseAllNodes.Nodes { // fmt.Println(i.URL, i.Priority(), i.PredictedLatency) // } - // Get the candidate nodes for a few cids from our formed cid list using - // the affinity of each cid. - for i := 0; i < 10; i++ { - rand.New(rand.NewSource(time.Now().Unix())) - idx := rand.Intn(len(cidList)) - c := cidList[idx] + // Get the candidate nodes for each cid in the cid list to see + for _, c := range cidList { aff := ch.Caboose.GetAffinity(ctx) if aff == "" { aff = fmt.Sprintf(blockPathPattern, c) } nodes, _ := ch.CabooseActiveNodes.GetNodes(aff, ch.Config.MaxRetrievalAttempts) - // We expect that the candidate nodes are part of the "good nodes" list. assert.Contains(t, goodNodes, nodes[0]) + for _, n := range badNodes { + n := n + if n.URL == nodes[0].URL { + rerouteCount++ + } + } } + // no more than 5 cids from the cid list of 20 should get re-routed (25%) + assert.LessOrEqual(t, rerouteCount, 4) }) } From 5e02c7fd640c248e93169469e9c3fbd37ba3cdca Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 19 Sep 2023 14:42:05 +0400 Subject: [PATCH 4/7] test cache affinity --- pool_dynamics_test.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index 79de9f3..8d5f21c 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -222,8 +222,8 @@ func TestPoolAffinity(t *testing.T) { cidList := generateRandomCIDs(20) t.Run("selected nodes remain consistent for same cid reqs", func(t *testing.T) { - // 80 nodes will be in the good pool. 20 will be added later with the same stats - // so 20% of the nodes in the pool will eventually be "new nodes" that will be added later. + // 80 nodes will be in the good pool. 20 will be added later with the same stats. + // So, 20% of the nodes in the pool will eventually be "new nodes" that have been added later. ch, controlGroup := getHarnessAndControlGroup(t, 100, 80) _, _ = ch.Caboose.Get(ctx, cidList[0]) @@ -267,12 +267,6 @@ func TestPoolAffinity(t *testing.T) { Size: float64(baseStatSize) * float64(10), } - // variedStats := util.NodeStats{ - // Start: time.Now().Add(-time.Second * 2), - // Latency: float64(baseStatLatency) / (float64(10) + (1 + statVarianceFactor)), - // Size: float64(baseStatSize) * float64(10) * (1 + statVarianceFactor), - // } - ch.RecordSuccesses(t, goodNodes, baseStats, 100) ch.RecordSuccesses(t, badNodes, baseStats, 10) @@ -281,11 +275,7 @@ func TestPoolAffinity(t *testing.T) { rerouteCount := 0 - // for _, i := range ch.CabooseAllNodes.Nodes { - // fmt.Println(i.URL, i.Priority(), i.PredictedLatency) - // } - - // Get the candidate nodes for each cid in the cid list to see + // Get the candidate nodes for each cid in the cid list to see if it's been rerouted to a new node. for _, c := range cidList { aff := ch.Caboose.GetAffinity(ctx) if aff == "" { From d8ae01e1b773e271e42ba072685ad73a381be21c Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 19 Sep 2023 14:50:11 +0400 Subject: [PATCH 5/7] remove assert --- pool_dynamics_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index 8d5f21c..2c53218 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -283,7 +283,6 @@ func TestPoolAffinity(t *testing.T) { } nodes, _ := ch.CabooseActiveNodes.GetNodes(aff, ch.Config.MaxRetrievalAttempts) - assert.Contains(t, goodNodes, nodes[0]) for _, n := range badNodes { n := n if n.URL == nodes[0].URL { From b647fab2f03a99f100cded188af0b4adfd4a6727 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 19 Sep 2023 16:21:01 +0400 Subject: [PATCH 6/7] fix test --- pool_dynamics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index 2c53218..ddd0806 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -292,7 +292,7 @@ func TestPoolAffinity(t *testing.T) { } // no more than 5 cids from the cid list of 20 should get re-routed (25%) - assert.LessOrEqual(t, rerouteCount, 4) + assert.LessOrEqual(t, rerouteCount, 5) }) } From 0cf6c943f8c9e94d8ce308340777afb1cfa8d368 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 19 Sep 2023 16:22:24 +0400 Subject: [PATCH 7/7] address review --- pool_dynamics_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index ddd0806..dc2d4a9 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -227,15 +227,15 @@ func TestPoolAffinity(t *testing.T) { ch, controlGroup := getHarnessAndControlGroup(t, 100, 80) _, _ = ch.Caboose.Get(ctx, cidList[0]) - goodNodes := make([]*caboose.Node, 0) - badNodes := make([]*caboose.Node, 0) + existingNodes := make([]*caboose.Node, 0) + newNodes := make([]*caboose.Node, 0) for _, n := range ch.CabooseAllNodes.Nodes { _, ok := controlGroup[n.URL] if ok { - goodNodes = append(goodNodes, n) + existingNodes = append(existingNodes, n) } else { - badNodes = append(badNodes, n) + newNodes = append(newNodes, n) } } @@ -247,7 +247,7 @@ func TestPoolAffinity(t *testing.T) { Size: float64(baseStatSize) * float64(10), } - ch.RecordSuccesses(t, goodNodes, baseStats, 1000) + ch.RecordSuccesses(t, existingNodes, baseStats, 1000) ch.CaboosePool.DoRefresh() } @@ -267,8 +267,8 @@ func TestPoolAffinity(t *testing.T) { Size: float64(baseStatSize) * float64(10), } - ch.RecordSuccesses(t, goodNodes, baseStats, 100) - ch.RecordSuccesses(t, badNodes, baseStats, 10) + ch.RecordSuccesses(t, existingNodes, baseStats, 100) + ch.RecordSuccesses(t, newNodes, baseStats, 10) ch.CaboosePool.DoRefresh() } @@ -283,7 +283,7 @@ func TestPoolAffinity(t *testing.T) { } nodes, _ := ch.CabooseActiveNodes.GetNodes(aff, ch.Config.MaxRetrievalAttempts) - for _, n := range badNodes { + for _, n := range newNodes { n := n if n.URL == nodes[0].URL { rerouteCount++