Skip to content

Commit

Permalink
Restructure callback and schedule interface
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Linhuber <[email protected]>
  • Loading branch information
robertjndw and Mtze committed Sep 22, 2023
1 parent eccc444 commit e52bd02
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 9 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/hades.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions HadesScheduler/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

import "github.com/Mtze/HadesCI/shared/payload"

type JobScheduler interface {
ScheduleJob(job payload.BuildJob) error
}

type K8sScheduler struct{}

func (k *K8sScheduler) ScheduleJob(job payload.BuildJob) error {
return nil
}
9 changes: 2 additions & 7 deletions HadesScheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/Mtze/HadesCI/shared/queue"
"github.com/Mtze/HadesCI/shared/utils"

amqp "github.com/rabbitmq/amqp091-go"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -63,12 +62,8 @@ func main() {

var forever chan struct{}

f := func(ch <-chan amqp.Delivery) {
for d := range ch {
log.Printf("Received a message: %s", d.Body)
}
}
BuildQueue.Dequeue(f)
scheduler := K8sScheduler{}
BuildQueue.Dequeue(scheduler.ScheduleJob)

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ services:
- "5672:5672"
networks:
- hades
environment:
- RABBITMQ_DEFAULT_USER
- RABBITMQ_DEFAULT_PASS
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
Expand Down
12 changes: 10 additions & 2 deletions shared/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (q *Queue) Enqueue(ctx context.Context, msg payload.BuildJob) error {
return nil
}

func (q *Queue) Dequeue(callback func(<-chan amqp.Delivery)) error {
func (q *Queue) Dequeue(callback func(job payload.BuildJob) error) error {
msgs, err := q.channel.Consume(
q.queue.Name, // queue
"", // consumer
Expand All @@ -94,6 +94,14 @@ func (q *Queue) Dequeue(callback func(<-chan amqp.Delivery)) error {
return err
}

go callback(msgs)
for d := range msgs {
var buildJob payload.BuildJob
err := json.Unmarshal(d.Body, &buildJob)
if err != nil {
log.WithError(err).Warn("error unmarshalling message %s", d.Body)
}

go callback(buildJob)
}
return nil
}

0 comments on commit e52bd02

Please sign in to comment.