Skip to content

Commit

Permalink
Continue porting routine to s3backend
Browse files Browse the repository at this point in the history
  • Loading branch information
moebiusband73 committed Jan 26, 2024
1 parent 3bb5ae6 commit d6fd265
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 93 deletions.
79 changes: 72 additions & 7 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@
package archive

import (
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"path/filepath"
"strconv"

"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
Expand Down Expand Up @@ -52,9 +59,69 @@ type JobContainer struct {
Data *schema.JobData
}

var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
var ar ArchiveBackend
var useArchive bool
var (
cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
ar ArchiveBackend
useArchive bool
)

func getPath(
job *schema.Job,
rootPath string,
file string,
) string {
return filepath.Join(
getDirectory(job, rootPath), file)
}

func getDirectory(
job *schema.Job,
rootPath string,
) string {
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)

return filepath.Join(
rootPath,
job.Cluster,
lvl1, lvl2,
strconv.FormatInt(job.StartTime.Unix(), 10))
}

func loadJobMeta(b []byte) (*schema.JobMeta, error) {
if config.Keys.Validate {
if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil {
return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err)
}
}

return DecodeJobMeta(bytes.NewReader(b))
}

func loadJobData(f io.Reader, key string, isCompressed bool) (schema.JobData, error) {
if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
return nil, err
}
defer r.Close()

if config.Keys.Validate {
if err := schema.Validate(schema.Data, r); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
}

return DecodeJobData(r, key)
} else {
if config.Keys.Validate {
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
}
return DecodeJobData(bufio.NewReader(f), key)
}
}

func Init(rawConfig json.RawMessage, disableArchive bool) error {
useArchive = !disableArchive
Expand Down Expand Up @@ -95,8 +162,8 @@ func GetHandle() ArchiveBackend {
func LoadAveragesFromArchive(
job *schema.Job,
metrics []string,
data [][]schema.Float) error {

data [][]schema.Float,
) error {
metaFile, err := ar.LoadJobMeta(job)
if err != nil {
log.Warn("Error while loading job metadata from archiveBackend")
Expand All @@ -115,7 +182,6 @@ func LoadAveragesFromArchive(
}

func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {

metaFile, err := ar.LoadJobMeta(job)
if err != nil {
log.Warn("Error while loading job metadata from archiveBackend")
Expand All @@ -128,7 +194,6 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
// If the job is archived, find its `meta.json` file and override the tags list
// in that JSON file. If the job is not archived, nothing is done.
func UpdateTags(job *schema.Job, tags []*schema.Tag) error {

if job.State == schema.JobStateRunning || !useArchive {
return nil
}
Expand Down
88 changes: 15 additions & 73 deletions pkg/archive/fsBackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
package archive

import (
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
Expand All @@ -20,7 +18,6 @@ import (
"text/tabwriter"
"time"

"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
Expand All @@ -43,71 +40,6 @@ type clusterInfo struct {
diskSize float64
}

func getDirectory(
job *schema.Job,
rootPath string,
) string {
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)

return filepath.Join(
rootPath,
job.Cluster,
lvl1, lvl2,
strconv.FormatInt(job.StartTime.Unix(), 10))
}

func getPath(
job *schema.Job,
rootPath string,
file string,
) string {
return filepath.Join(
getDirectory(job, rootPath), file)
}

func loadJobMeta(b []byte) (*schema.JobMeta, error) {
if config.Keys.Validate {
if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil {
return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err)
}
}

return DecodeJobMeta(bytes.NewReader(b))
}

func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
}
defer f.Close()

if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
return nil, err
}
defer r.Close()

if config.Keys.Validate {
if err := schema.Validate(schema.Data, r); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
}

return DecodeJobData(r, filename)
} else {
if config.Keys.Validate {
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
}
return DecodeJobData(bufio.NewReader(f), filename)
}
}

func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
Expand Down Expand Up @@ -368,15 +300,21 @@ func (fsa *FsArchive) CompressLast(starttime int64) int64 {
}

func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
var isCompressed bool = true
isCompressed := true
filename := getPath(job, fsa.path, "data.json.gz")

if !util.CheckFileExists(filename) {
filename = getPath(job, fsa.path, "data.json")
isCompressed = false
}

return loadJobData(filename, isCompressed)
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
}
defer f.Close()
return loadJobData(f, filename, isCompressed)
}

func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
Expand All @@ -393,13 +331,11 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("LoadClusterCfg() > open file error: %v", err)
// if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
log.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
}
// }
return DecodeCluster(bytes.NewReader(b))
}

Expand Down Expand Up @@ -458,7 +394,13 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
isCompressed = false
}

data, err := loadJobData(filename, isCompressed)
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
}
defer f.Close()

data, err := loadJobData(f, filename, isCompressed)
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
Expand Down
Loading

0 comments on commit d6fd265

Please sign in to comment.