Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

231 sql statement syntax #242

Merged
merged 6 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions internal/repository/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
func (r *JobRepository) Find(
jobId *int64,
cluster *string,
startTime *int64) (*schema.Job, error) {

startTime *int64,
) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
Expand All @@ -248,8 +248,8 @@ func (r *JobRepository) Find(
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64) ([]*schema.Job, error) {

startTime *int64,
) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
Expand Down Expand Up @@ -292,7 +292,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {

func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job) (*model.JobLinkResultList, error) {
job *schema.Job,
) (*model.JobLinkResultList, error) {
if job == nil {
return nil, nil
}
Expand Down Expand Up @@ -420,8 +421,8 @@ func (r *JobRepository) Stop(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32) (err error) {

monitoringStatus int32,
) (err error) {
stmt := sq.Update("job").
Set("job_state", state).
Set("duration", duration).
Expand All @@ -434,21 +435,27 @@ func (r *JobRepository) Stop(

func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
var cnt int
qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime)
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
q.RunWith(r.DB).QueryRow().Scan(cnt)
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
_, err := qd.RunWith(r.DB).Exec()

if err != nil {
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
s, _, _ := qd.ToSql()
log.Errorf(" DeleteJobsBefore(%d) with %s: error %#v", startTime, s, err)
} else {
log.Debugf("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
}
return cnt, err
}

func (r *JobRepository) DeleteJobById(id int64) error {
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
qd := sq.Delete("job").Where("job.id = ?", id)
_, err := qd.RunWith(r.DB).Exec()

if err != nil {
log.Errorf("DeleteJobById(%d): error %#v", id, err)
s, _, _ := qd.ToSql()
log.Errorf("DeleteJobById(%d) with %s : error %#v", id, s, err)
} else {
log.Debugf("DeleteJobById(%d): Success", id)
}
Expand All @@ -468,8 +475,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
func (r *JobRepository) MarkArchived(
jobId int64,
monitoringStatus int32,
metricStats map[string]schema.JobStatistics) error {

metricStats map[string]schema.JobStatistics,
) error {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
Expand Down Expand Up @@ -578,8 +585,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm
}
}

var ErrNotFound = errors.New("no such jobname, project or user")
var ErrForbidden = errors.New("not authorized")
var (
ErrNotFound = errors.New("no such jobname, project or user")
ErrForbidden = errors.New("not authorized")
)

func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
compareStr := " = ?"
Expand Down Expand Up @@ -663,7 +672,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {

start := time.Now()
subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job").
Expand Down Expand Up @@ -706,7 +714,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
}

func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {

start := time.Now()
res, err := sq.Update("job").
Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
Expand Down Expand Up @@ -735,7 +742,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
}

func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {

var query sq.SelectBuilder

if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
Expand Down
2 changes: 1 addition & 1 deletion internal/repository/migrations/mysql/01_init-schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ CREATE TABLE IF NOT EXISTS job (
);

CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
id INTEGER AUTO_INCREMENT PRIMARY KEY,
tag_type VARCHAR(255) NOT NULL,
tag_name VARCHAR(255) NOT NULL,
UNIQUE (tag_type, tag_name));
Expand Down
23 changes: 17 additions & 6 deletions internal/repository/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (

// Add the tag with id `tagId` to the job with the database id `jobId`.
func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
if _, err := r.stmtCache.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)`, job, tag); err != nil {
log.Error("Error while running query")
q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag)

if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error adding tag with %s: %v", s, err)
return nil, err
}

Expand All @@ -37,8 +40,11 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {

// Removes a tag from a job
func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
if _, err := r.stmtCache.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", job, tag); err != nil {
log.Error("Error while running query")
q := sq.Delete("jobtag").Where("jobtag.job_id = ?", job).Where("jobtag.tag_id = ?", tag)

if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error adding tag with %s: %v", s, err)
return nil, err
}

Expand All @@ -59,8 +65,12 @@ func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {

// CreateTag creates a new tag with the specified type and name and returns its database id.
func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, err error) {
res, err := r.stmtCache.Exec("INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)", tagType, tagName)
q := sq.Insert("tag").Columns("tag_type", "tag_name").Values(tagType, tagName)

res, err := q.RunWith(r.stmtCache).Exec()
if err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error inserting tag with %s: %v", s, err)
return 0, err
}

Expand Down Expand Up @@ -154,7 +164,8 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {

rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
s, _, _ := q.ToSql()
log.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}

Expand Down
Loading