diff --git a/agent.go b/agent.go new file mode 100644 index 0000000..5df7ea5 --- /dev/null +++ b/agent.go @@ -0,0 +1,76 @@ +package ants + +import ( + "regexp" + "strconv" + "strings" +) + +var ( + semverRegex = regexp.MustCompile(`.*(?P0|[1-9]\d*)\.(?P0|[1-9]\d*)\.(?P0|[1-9]\d*).*`) + hexRegex = regexp.MustCompile(`^[a-fA-F0-9]+$`) +) + +type agentVersionInfo struct { + full string + typ string + major int + minor int + patch int + hash string +} + +func (avi *agentVersionInfo) Semver() [3]int { + return [3]int{avi.major, avi.minor, avi.patch} +} + +func parseAgentVersion(av string) agentVersionInfo { + avi := agentVersionInfo{ + full: av, + } + + switch { + case av == "": + return avi + case av == "celestia-celestia": + avi.typ = "celestia-celestia" + return avi + case strings.HasPrefix(av, "celestia-node/celestia/"): + // fallthrough + default: + avi.typ = "other" + return avi + } + + parts := strings.Split(av, "/") + if len(parts) > 2 { + switch parts[2] { + case "bridge", "full", "light": + avi.typ = parts[2] + default: + avi.typ = "other" + } + } + + if len(parts) > 3 { + matches := semverRegex.FindStringSubmatch(parts[3]) + if matches != nil { + for i, name := range semverRegex.SubexpNames() { + switch name { + case "major": + avi.major, _ = strconv.Atoi(matches[i]) + case "minor": + avi.minor, _ = strconv.Atoi(matches[i]) + case "patch": + avi.patch, _ = strconv.Atoi(matches[i]) + } + } + } + } + + if len(parts) > 4 && hexRegex.MatchString(parts[4]) { + avi.hash = parts[4] + } + + return avi +} diff --git a/agent_test.go b/agent_test.go new file mode 100644 index 0000000..6677cb3 --- /dev/null +++ b/agent_test.go @@ -0,0 +1,113 @@ +package ants + +import ( + "reflect" + "testing" +) + +func Test_parseAgentVersion(t *testing.T) { + tests := []struct { + av string + want agentVersionInfo + }{ + { + av: "", + want: agentVersionInfo{}, + }, + { + av: "celestia-node/celestia/bridge/v0.17.1/078c291", + want: agentVersionInfo{ + full: "celestia-node/celestia/bridge/v0.17.1/078c291", + typ: "bridge", + major: 0, + minor: 17, + patch: 1, + hash: "078c291", + }, + }, + { + av: "celestia-node/celestia/full/v0.17.2/57f8bd8", + want: agentVersionInfo{ + full: "celestia-node/celestia/full/v0.17.2/57f8bd8", + typ: "full", + major: 0, + minor: 17, + patch: 2, + hash: "57f8bd8", + }, + }, + { + av: "celestia-node/celestia/random/v4.46.6/57f8bd8", + want: agentVersionInfo{ + full: "celestia-node/celestia/random/v4.46.6/57f8bd8", + typ: "other", + major: 4, + minor: 46, + patch: 6, + hash: "57f8bd8", + }, + }, + { + av: "celestia-node/celestia/light/vv0.14.0/13439cc", + want: agentVersionInfo{ + full: "celestia-node/celestia/light/vv0.14.0/13439cc", + typ: "light", + major: 0, + minor: 14, + patch: 0, + hash: "13439cc", + }, + }, + { + av: "celestia-node/celestia/light/v0.20.3-15-gbd3105b9/bd3105b", + want: agentVersionInfo{ + full: "celestia-node/celestia/light/v0.20.3-15-gbd3105b9/bd3105b", + typ: "light", + major: 0, + minor: 20, + patch: 3, + hash: "bd3105b", + }, + }, + { + av: "celestia-node/celestia/full/v0.18.0-refs-tags-v0-20-1-mocha.0/353141f", + want: agentVersionInfo{ + full: "celestia-node/celestia/full/v0.18.0-refs-tags-v0-20-1-mocha.0/353141f", + typ: "full", + major: 0, + minor: 18, + patch: 0, + hash: "353141f", + }, + }, + { + av: "celestia-node/celestia/light/unknown/unknown", + want: agentVersionInfo{ + full: "celestia-node/celestia/light/unknown/unknown", + typ: "light", + }, + }, + { + av: "celestia-celestia", + want: agentVersionInfo{ + full: "celestia-celestia", + typ: "celestia-celestia", + }, + }, + { + av: "celestiant", + want: agentVersionInfo{ + full: "celestiant", + typ: "other", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.av, func(t *testing.T) { + if got := parseAgentVersion(tt.av); !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseAgentVersion() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/db/migrations/000002_alter_requests_table_add_node_type.down.sql b/db/migrations/000002_alter_requests_table_add_node_type.down.sql new file mode 100644 index 0000000..9c6cdf5 --- /dev/null +++ b/db/migrations/000002_alter_requests_table_add_node_type.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE requests + DROP COLUMN IF EXISTS agent_version_type, + DROP COLUMN IF EXISTS agent_version_semver; \ No newline at end of file diff --git a/db/migrations/000002_alter_requests_table_add_node_type.up.sql b/db/migrations/000002_alter_requests_table_add_node_type.up.sql new file mode 100644 index 0000000..f8563b4 --- /dev/null +++ b/db/migrations/000002_alter_requests_table_add_node_type.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE requests + ADD COLUMN agent_version_type LowCardinality(String) AFTER agent_version, + ADD COLUMN agent_version_semver Array(Int16) AFTER agent_version_type; diff --git a/db/migrations/000003_create_node_population_table.down.sql b/db/migrations/000003_create_node_population_table.down.sql new file mode 100644 index 0000000..fc14a33 --- /dev/null +++ b/db/migrations/000003_create_node_population_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS node_population; \ No newline at end of file diff --git a/db/migrations/000003_create_node_population_table.up.sql b/db/migrations/000003_create_node_population_table.up.sql new file mode 100644 index 0000000..f6a3e6b --- /dev/null +++ b/db/migrations/000003_create_node_population_table.up.sql @@ -0,0 +1,7 @@ +CREATE TABLE node_population ( + timestamp DateTime, + remote_multihash AggregateFunction(uniqExact, String), + agent_version_type LowCardinality(String), + agent_version_semver Array(Int16) +) ENGINE = AggregatingMergeTree() + PRIMARY KEY (timestamp, agent_version_type) \ No newline at end of file diff --git a/db/migrations/000004_create_node_population_materialized_view.down.sql b/db/migrations/000004_create_node_population_materialized_view.down.sql new file mode 100644 index 0000000..32482f3 --- /dev/null +++ b/db/migrations/000004_create_node_population_materialized_view.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS node_population_mv; \ No newline at end of file diff --git a/db/migrations/000004_create_node_population_materialized_view.up.sql b/db/migrations/000004_create_node_population_materialized_view.up.sql new file mode 100644 index 0000000..d2afe4c --- /dev/null +++ b/db/migrations/000004_create_node_population_materialized_view.up.sql @@ -0,0 +1,8 @@ +CREATE MATERIALIZED VIEW node_population_mv TO node_population AS +SELECT + toStartOfTenMinutes(started_at) as timestamp, + uniqExactState(remote_multihash) as remote_multihash, + agent_version_type, + agent_version_semver +FROM requests +GROUP BY timestamp, agent_version_type, agent_version_semver \ No newline at end of file diff --git a/db/migrations/000005_alter_requests_table_ttl.down.sql b/db/migrations/000005_alter_requests_table_ttl.down.sql new file mode 100644 index 0000000..dce9c99 --- /dev/null +++ b/db/migrations/000005_alter_requests_table_ttl.down.sql @@ -0,0 +1 @@ +ALTER TABLE requests MODIFY TTL toDateTime(started_at) + INTERVAL 180 DAY; \ No newline at end of file diff --git a/db/migrations/000005_alter_requests_table_ttl.up.sql b/db/migrations/000005_alter_requests_table_ttl.up.sql new file mode 100644 index 0000000..f84a445 --- /dev/null +++ b/db/migrations/000005_alter_requests_table_ttl.up.sql @@ -0,0 +1 @@ +ALTER TABLE requests MODIFY TTL toDateTime(started_at) + INTERVAL 90 DAY; \ No newline at end of file diff --git a/db/models.go b/db/models.go index 1b71a62..f3e80dd 100644 --- a/db/models.go +++ b/db/models.go @@ -9,14 +9,16 @@ import ( ) type Request struct { - UUID uuid.UUID `ch:"id"` - QueenID string `ch:"queen_id"` - AntID peer.ID `ch:"ant_multihash"` - RemoteID peer.ID `ch:"remote_multihash"` - RequestType pb.Message_MessageType `ch:"request_type"` - AgentVersion string `ch:"agent_version"` - Protocols []string `ch:"protocols"` - StartedAt time.Time `ch:"started_at"` - KeyID string `ch:"key_multihash"` - MultiAddresses []string `ch:"multi_addresses"` + UUID uuid.UUID `ch:"id"` + QueenID string `ch:"queen_id"` + AntID peer.ID `ch:"ant_multihash"` + RemoteID peer.ID `ch:"remote_multihash"` + RequestType pb.Message_MessageType `ch:"request_type"` + AgentVersion string `ch:"agent_version"` + AgentVersionType string `ch:"agent_version_type"` + AgentVersionSemVer [3]int `ch:"agent_version_semver"` + Protocols []string `ch:"protocols"` + StartedAt time.Time `ch:"started_at"` + KeyID string `ch:"key_multihash"` + MultiAddresses []string `ch:"multi_addresses"` } diff --git a/queen.go b/queen.go index 6c38cf8..0acc949 100644 --- a/queen.go +++ b/queen.go @@ -3,6 +3,7 @@ package ants import ( "context" "fmt" + "sort" "strconv" "time" @@ -55,7 +56,7 @@ type Queen struct { peerstore peerstore.Peerstore datastore ds.Batching - agentsCache *lru.Cache[string, string] + agentsCache *lru.Cache[string, agentVersionInfo] protocolsCache *lru.Cache[string, []protocol.ID] ants []*Ant @@ -79,7 +80,7 @@ func NewQueen(clickhouseClient db.Client, cfg *QueenConfig) (*Queen, error) { return nil, fmt.Errorf("creating in-memory leveldb: %w", err) } - agentsCache, err := lru.New[string, string](cfg.CacheSize) + agentsCache, err := lru.New[string, agentVersionInfo](cfg.CacheSize) if err != nil { return nil, fmt.Errorf("init agents cache: %w", err) } @@ -185,16 +186,18 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) { maddrStrs[i] = maddr.String() } + avi := parseAgentVersion(evt.AgentVersion) + // cache agent version - if evt.AgentVersion == "" { + if avi.full == "" { var found bool - evt.AgentVersion, found = q.agentsCache.Get(evt.Remote.String()) + avi, found = q.agentsCache.Get(evt.Remote.String()) q.cfg.Telemetry.CacheHitCounter.Add(ctx, 1, metric.WithAttributes( attribute.String("hit", strconv.FormatBool(found)), attribute.String("cache", "agent_version"), )) } else { - q.agentsCache.Add(evt.Remote.String(), evt.AgentVersion) + q.agentsCache.Add(evt.Remote.String(), avi) } // cache protocols @@ -211,6 +214,7 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) { q.protocolsCache.Add(evt.Remote.String(), evt.Protocols) } protocolStrs := protocol.ConvertToStrings(protocols) + sort.Strings(protocolStrs) uuidv7, err := uuid.NewV7() if err != nil { @@ -219,22 +223,23 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) { } request := &db.Request{ - UUID: uuidv7, - QueenID: q.id, - AntID: evt.Self, - RemoteID: evt.Remote, - RequestType: evt.Type, - AgentVersion: evt.AgentVersion, - Protocols: protocolStrs, - StartedAt: evt.Timestamp, - KeyID: evt.Target.B58String(), - MultiAddresses: maddrStrs, + UUID: uuidv7, + QueenID: q.id, + AntID: evt.Self, + RemoteID: evt.Remote, + RequestType: evt.Type, + AgentVersion: evt.AgentVersion, + AgentVersionType: avi.typ, + AgentVersionSemVer: avi.Semver(), + Protocols: protocolStrs, + StartedAt: evt.Timestamp, + KeyID: evt.Target.B58String(), + MultiAddresses: maddrStrs, } requests = append(requests, request) if len(requests) >= q.cfg.BatchSize { - if err = q.clickhouseClient.BulkInsertRequests(ctx, requests); err != nil { logger.Errorf("Error inserting requests: %v", err) }