Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Reformat.
Convert to query builder.
Add descriptive error log messages.
  • Loading branch information
moebiusband73 committed Mar 6, 2024
1 parent dd887cb commit aa6336e
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions internal/repository/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
func (r *JobRepository) Find(
jobId *int64,
cluster *string,
startTime *int64) (*schema.Job, error) {

startTime *int64,
) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
Expand All @@ -248,8 +248,8 @@ func (r *JobRepository) Find(
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64) ([]*schema.Job, error) {

startTime *int64,
) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
Expand Down Expand Up @@ -292,7 +292,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {

func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job) (*model.JobLinkResultList, error) {
job *schema.Job,
) (*model.JobLinkResultList, error) {
if job == nil {
return nil, nil
}
Expand Down Expand Up @@ -420,8 +421,8 @@ func (r *JobRepository) Stop(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32) (err error) {

monitoringStatus int32,
) (err error) {
stmt := sq.Update("job").
Set("job_state", state).
Set("duration", duration).
Expand All @@ -434,21 +435,27 @@ func (r *JobRepository) Stop(

func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
var cnt int
qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime)
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
q.RunWith(r.DB).QueryRow().Scan(cnt)
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
_, err := qd.RunWith(r.DB).Exec()

if err != nil {
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
s, _, _ := qd.ToSql()
log.Errorf(" DeleteJobsBefore(%d) with %s: error %#v", startTime, s, err)
} else {
log.Debugf("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
}
return cnt, err
}

func (r *JobRepository) DeleteJobById(id int64) error {
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
qd := sq.Delete("job").Where("job.id = ?", id)
_, err := qd.RunWith(r.DB).Exec()

if err != nil {
log.Errorf("DeleteJobById(%d): error %#v", id, err)
s, _, _ := qd.ToSql()
log.Errorf("DeleteJobById(%d) with %s : error %#v", id, s, err)
} else {
log.Debugf("DeleteJobById(%d): Success", id)
}
Expand All @@ -468,8 +475,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
func (r *JobRepository) MarkArchived(
jobId int64,
monitoringStatus int32,
metricStats map[string]schema.JobStatistics) error {

metricStats map[string]schema.JobStatistics,
) error {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
Expand Down Expand Up @@ -578,8 +585,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm
}
}

var ErrNotFound = errors.New("no such jobname, project or user")
var ErrForbidden = errors.New("not authorized")
var (
ErrNotFound = errors.New("no such jobname, project or user")
ErrForbidden = errors.New("not authorized")
)

func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
compareStr := " = ?"
Expand Down Expand Up @@ -663,7 +672,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {

start := time.Now()
subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job").
Expand Down Expand Up @@ -706,7 +714,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
}

func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {

start := time.Now()
res, err := sq.Update("job").
Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
Expand Down Expand Up @@ -735,7 +742,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
}

func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {

var query sq.SelectBuilder

if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
Expand Down

0 comments on commit aa6336e

Please sign in to comment.