Skip to content

Commit

Permalink
Fix importers and add Energy footprint to import
Browse files Browse the repository at this point in the history
  • Loading branch information
moebiusband73 committed Nov 16, 2024
1 parent 210a7d3 commit cdd45ce
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
32 changes: 31 additions & 1 deletion internal/importer/handleImport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"math"
"os"
"strings"

Expand Down Expand Up @@ -84,14 +85,43 @@ func HandleImportFlag(flag string) error {
}

name := fmt.Sprintf("%s_%s", fp, statType)
job.Footprint[fp] = repository.LoadJobStat(&job, name, statType)

job.Footprint[name] = repository.LoadJobStat(&job, fp, statType)
}

job.RawFootprint, err = json.Marshal(job.Footprint)
if err != nil {
log.Warn("Error while marshaling job footprint")
return err
}

job.EnergyFootprint = make(map[string]float64)
var totalEnergy float64
var energy float64

for _, fp := range sc.EnergyFootprint {
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
// Note: For DB data, calculate and save as kWh
// Energy: Power (in Watts) * Time (in Seconds)
if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules)
} else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt)
// Unit: ( W * s ) / 3600 / 1000 = kWh ; Rounded to 2 nearest digits
energy = math.Round(((repository.LoadJobStat(&job, fp, "avg")*float64(job.Duration))/3600/1000)*100) / 100
}
} else {
log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID)
}

job.EnergyFootprint[fp] = energy
totalEnergy += energy
}

job.Energy = (math.Round(totalEnergy*100) / 100)
if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil {
log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID)
return err
}

job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
log.Warn("Error while marshaling job resources")
Expand Down
29 changes: 29 additions & 0 deletions internal/importer/initDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package importer
import (
"encoding/json"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -70,6 +71,7 @@ func InitDB() error {
log.Errorf("cannot get subcluster: %s", err.Error())
return err
}

job.Footprint = make(map[string]float64)

for _, fp := range sc.Footprint {
Expand All @@ -90,6 +92,33 @@ func InitDB() error {
return err
}

job.EnergyFootprint = make(map[string]float64)
var totalEnergy float64
var energy float64

for _, fp := range sc.EnergyFootprint {
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
// Note: For DB data, calculate and save as kWh
// Energy: Power (in Watts) * Time (in Seconds)
if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules)
} else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt)
// Unit: ( W * s ) / 3600 / 1000 = kWh ; Rounded to 2 nearest digits
energy = math.Round(((repository.LoadJobStat(jobMeta, fp, "avg")*float64(jobMeta.Duration))/3600/1000)*100) / 100
}
} else {
log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID)
}

job.EnergyFootprint[fp] = energy
totalEnergy += energy
}

job.Energy = (math.Round(totalEnergy*100) / 100)
if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil {
log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID)
return err
}

job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
log.Errorf("repository initDB(): %v", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/repository/jobCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (

const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
) VALUES (
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
);`

func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
Expand Down

0 comments on commit cdd45ce

Please sign in to comment.