Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prometheus init data race #966

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions components/prometheus/collector/collection.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
package collector

import (
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/runtime/options"
)

type Collection struct {
CollectionName string
metrics map[string]*Metric
metrics *shrinkingmap.ShrinkingMap[string, *Metric]
}

func NewCollection(name string, opts ...options.Option[Collection]) *Collection {
return options.Apply(&Collection{
CollectionName: name,
metrics: make(map[string]*Metric),
}, opts, func(c *Collection) {
for _, m := range c.metrics {
m.Namespace = c.CollectionName
m.initPromMetric()
}
metrics: shrinkingmap.New[string, *Metric](),
}, opts, func(collection *Collection) {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.Namespace = collection.CollectionName
metric.initPromMetric()

return true
})
})
}

func (c *Collection) GetMetric(metricName string) *Metric {
if metric, exists := c.metrics[metricName]; exists {
return metric
metric, exists := c.metrics.Get(metricName)
if !exists {
return nil
}

return nil
return metric
}

func (c *Collection) addMetric(metric *Metric) {
if metric != nil {
c.metrics[metric.Name] = metric
c.metrics.Set(metric.Name, metric)
}
}

Expand Down
57 changes: 34 additions & 23 deletions components/prometheus/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,50 @@ package collector

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
)

// Collector is responsible for creation and collection of metrics for the prometheus.
type Collector struct {
Registry *prometheus.Registry
collections map[string]*Collection
collections *shrinkingmap.ShrinkingMap[string, *Collection]
}

// New creates an instance of Manager and creates a new prometheus registry for the protocol metrics collection.
func New() *Collector {
return &Collector{
Registry: prometheus.NewRegistry(),
collections: make(map[string]*Collection),
collections: shrinkingmap.New[string, *Collection](),
}
}

func (c *Collector) RegisterCollection(coll *Collection) {
c.collections[coll.CollectionName] = coll
for _, m := range coll.metrics {
c.Registry.MustRegister(m.promMetric)
if m.initValueFunc != nil {
metricValue, labelValues := m.initValueFunc()
m.update(metricValue, labelValues...)
func (c *Collector) RegisterCollection(collection *Collection) {
c.collections.Set(collection.CollectionName, collection)
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
c.Registry.MustRegister(metric.promMetric)
if metric.initValueFunc != nil {
metricValue, labelValues := metric.initValueFunc()
metric.update(metricValue, labelValues...)
}
if m.initFunc != nil {
m.initFunc()
if metric.initFunc != nil {
metric.initFunc()
}
}

return true
})
}

// Collect collects all metrics from the registered collections.
func (c *Collector) Collect() {
for _, collection := range c.collections {
for _, metric := range collection.metrics {
c.collections.ForEach(func(_ string, collection *Collection) bool {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.collect()
}
}
return true
})

return true
})
}

// Update updates the value of the existing metric defined by the subsystem and metricName.
Expand Down Expand Up @@ -78,11 +85,14 @@ func (c *Collector) ResetMetric(namespace string, metricName string) {
}

func (c *Collector) Shutdown() {
for _, collection := range c.collections {
for _, metric := range collection.metrics {
c.collections.ForEach(func(_ string, collection *Collection) bool {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.shutdown()
}
}
return true
})

return true
})
}

func (c *Collector) getMetric(subsystem string, metricName string) *Metric {
Expand All @@ -95,9 +105,10 @@ func (c *Collector) getMetric(subsystem string, metricName string) *Metric {
}

func (c *Collector) getCollection(subsystem string) *Collection {
if collection, exists := c.collections[subsystem]; exists {
return collection
collection, exists := c.collections.Get(subsystem)
if !exists {
return nil
}

return nil
return collection
}
47 changes: 24 additions & 23 deletions components/prometheus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

func init() {
Component = &app.Component{
Name: "Prometheus",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Run: run,
Name: "Prometheus",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Configure: configure,
Run: run,
IsEnabled: func(_ *dig.Container) bool {
return ParamsMetrics.Enabled
},
Expand All @@ -56,17 +57,32 @@ type dependencies struct {
Collector *collector.Collector
}

func run() error {
Component.LogInfo("Starting Prometheus exporter ...")
func provide(c *dig.Container) error {
return c.Provide(collector.New)
}

func configure() error {
if ParamsMetrics.GoMetrics {
deps.Collector.Registry.MustRegister(collectors.NewGoCollector())
}
if ParamsMetrics.ProcessMetrics {
deps.Collector.Registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
}

registerMetrics()
deps.Collector.RegisterCollection(TangleMetrics)
deps.Collector.RegisterCollection(ConflictMetrics)
deps.Collector.RegisterCollection(InfoMetrics)
deps.Collector.RegisterCollection(DBMetrics)
deps.Collector.RegisterCollection(CommitmentsMetrics)
deps.Collector.RegisterCollection(SlotMetrics)
deps.Collector.RegisterCollection(AccountMetrics)
deps.Collector.RegisterCollection(SchedulerMetrics)

return nil
}

func run() error {
Component.LogInfo("Starting Prometheus exporter ...")

return Component.Daemon().BackgroundWorker("Prometheus exporter", func(ctx context.Context) {
Component.LogInfo("Starting Prometheus exporter ... done")
Expand Down Expand Up @@ -118,18 +134,3 @@ func run() error {
Component.LogInfo("Stopping Prometheus exporter ... done")
}, daemon.PriorityMetrics)
}

func provide(c *dig.Container) error {
return c.Provide(collector.New)
}

func registerMetrics() {
deps.Collector.RegisterCollection(TangleMetrics)
deps.Collector.RegisterCollection(ConflictMetrics)
deps.Collector.RegisterCollection(InfoMetrics)
deps.Collector.RegisterCollection(DBMetrics)
deps.Collector.RegisterCollection(CommitmentsMetrics)
deps.Collector.RegisterCollection(SlotMetrics)
deps.Collector.RegisterCollection(AccountMetrics)
deps.Collector.RegisterCollection(SchedulerMetrics)
}
4 changes: 2 additions & 2 deletions pkg/toolset/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func benchmarkIO(args []string) error {
ts := time.Now()

lastStatusTime := time.Now()
for i := 0; i < objectCnt; i++ {
for i := range objectCnt {
// one read operation and one write operation per cycle
batchWriter.Enqueue(newBenchmarkObject(store, writeDoneWaitGroup, iotago_tpkg.RandBytes(32), iotago_tpkg.RandBytes(size)))

Expand Down Expand Up @@ -154,7 +154,7 @@ func benchmarkCPU(args []string) error {
}
}()

for i := 0; i < numWorkers; i++ {
for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion pkg/toolset/pwd_hash.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//nolint:err113
package toolset

import (
Expand Down Expand Up @@ -128,7 +129,6 @@ func hashPasswordAndSalt(args []string) error {
}

if *outputJSONFlag {

result := struct {
Password string `json:"passwordHash"`
Salt string `json:"passwordSalt"`
Expand Down
1 change: 1 addition & 0 deletions pkg/toolset/toolset.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
)

const (
//nolint:gosec // there is no hardcoded password
passwordEnvKey = "IOTA_CORE_TOOL_PASSWORD"

// printStatusInterval is the interval for printing status messages.
Expand Down
Loading