Skip to content

Commit

Permalink
pipeline rate measured since last metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Sep 26, 2024
1 parent f7d667b commit 7435f41
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type Metrics struct {
InternalMsPerRec float64
InternalRecordsPerSec float64

// PipelineRate is calculated as: total_number_of_records/total_time,
// where total_time is measured from when Conduit read the first record
// PipelineRate is calculated as: number of records since last metrics/time since last metrics,
// where total_time is measured from when Conduit read the lastMetrics record
// to when it wrote the last record
// (i.e. it includes the time a record spent in sources, processors and destinations)
PipelineRate uint64
Expand Down Expand Up @@ -195,8 +195,8 @@ func (c *consolePrinter) close() error {
}

type collector struct {
first Metrics
metricsURL string
lastMetrics Metrics
metricsURL string
}

func newCollector(baseURL string) (collector, error) {
Expand All @@ -219,7 +219,7 @@ func (c *collector) init() error {
if err != nil {
return err
}
c.first = first
c.lastMetrics = first
return nil
}

Expand All @@ -238,8 +238,8 @@ func (c *collector) collect() (Metrics, error) {
m.InternalRecordsPerSec = math.Round(float64(count) / totalInternalTime)
m.InternalMsPerRec = (totalInternalTime / float64(count)) * 1000

timeSinceFirstMetric := uint64(time.Since(c.first.MeasuredAt).Seconds())
m.PipelineRate = (count - c.first.Count) / timeSinceFirstMetric
timeSinceFirstMetric := uint64(time.Since(c.lastMetrics.MeasuredAt).Seconds())
m.PipelineRate = (count - c.lastMetrics.Count) / timeSinceFirstMetric
m.bytes = c.getSourceByteMetrics(metricFamilies)
m.BytesPerSec = units.HumanSize(m.bytes / float64(timeSinceFirstMetric))
m.MeasuredAt = time.Now()
Expand Down Expand Up @@ -364,16 +364,21 @@ func main() {

for {
time.Sleep(*interval)

metrics, err := c.collect()
if err != nil {
fmt.Printf("couldn't collect metrics: %v", err)
os.Exit(1)
}

err = p.print(metrics)
if err != nil {
fmt.Printf("couldn't print metrics: %v", err)
os.Exit(1)
}

c.lastMetrics = metrics

if time.Now().After(until) {
break
}
Expand Down

0 comments on commit 7435f41

Please sign in to comment.