TaskFlow is an efficient task executor and scheduler which is used to schedule jobs and tasks and multiple workers can pick those tasks and execute them
Import the taskflow-gosdk:
go get github.com/nikhilbhatia08/taskflow/taskflow-gosdk
package tasks
import (
"context"
"encoding/json"
"log"
"fmt"
"time"
taskflowgosdk "github.com/nikhilbhatia08/taskflow/taskflow-gosdk"
)
type EmailPayload struct {
EmailSenderId string
EmailRecieverId string
EmailBody string
}
// Write a function to create a task which consists of queuename, payload and the number of retries
func EmailDelivery() error {
taskflow, err := taskflowgosdk.NewServer("localhost:9003", "localhost:9002") // The configurations of the jobservice and the queueservice
if err != nil {
return err
}
payload, err := json.Marshal(EmailPayload{EmailSenderId: "SomeSenderId", EmailRecieverId: "SomeRecieverId", EmailBody: "Some body"})
if err != nil {
return err
}
taskflow.NewJob(&taskflowgosdk.CreateJobRequest{
QueueName: "EmailQueue",
Payload: string(payload),
Retries: 5,
})
return nil
}
Create a worker and start executing the jobs:
package tasks
import (
"context"
"encoding/json"
"log"
"fmt"
"time"
taskflowgosdk "github.com/nikhilbhatia08/taskflow/taskflow-gosdk"
)
type EmailPayload struct {
EmailSenderId string
EmailRecieverId string
EmailBody string
}
// Write a function to create a worker to poll to the task queue and execute the jobs
func EmailWorker() error {
worker, err := taskflowgosdk.NewServer("localhost:9003", "localhost:9002") // The configurations of the jobservice and the queueservice
if err != nil {
return err
}
worker.Run(&taskflowgosdk.RunConfigurations{
QueueName: "EmailQueue",
Handler: EmailDeliveryHandler,
})
return nil
}
func EmailDeliveryHandler(ctx context.Context, emailJob *taskflowgosdk.Job) error {
// Write the job handler logic
}
Here's a brief overview of the project's directory structure:
cmd/
: Contains the main entry points for the scheduler, coordinator, task queue and worker services.pkg/
: Contains the core logic for the scheduler, coordinator, task queue and worker services.data/
: Contains SQL scripts to initialize the db.tests/
: Contains integration tests.*-dockerfile
: Dockerfiles for building the scheduler, coordinator, task queue and worker services.docker-compose.yml
: Docker Compose configuration file for spinning up the entire cluster.