Skip to content

Commit

Permalink
Graceful shutdown servers and workers (#113)
Browse files Browse the repository at this point in the history
* ft: graceful shutdown servers and workers

* add new lectures to README file
  • Loading branch information
phamlequang authored Feb 3, 2024
1 parent 9544012 commit 3a4535f
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 25 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ This course is designed with a lot of details, so that everyone, even with very
- Lecture #68: [Docker compose: port + volume mapping](https://www.youtube.com/watch?v=nJBT5SKENAw&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE)
- Lecture #69: [How to install & use binary packages in Go](https://www.youtube.com/watch?v=TnJ4ssoNvkY&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE)
- Lecture #70: [Implement role-based access control (RBAC) in Go](https://www.youtube.com/watch?v=Py7dRhtuJ3E&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE)
- Lecture #71: [Grant AWS EKS cluster access to Postgres and Redis using security group](https://www.youtube.com/watch?v=pPXYu6QQGE8&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE)
- Lecture #72: [Deploy gRPC + HTTP server to AWS EKS cluster](https://www.youtube.com/watch?v=Pd7aeh014nU&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE)
- Lecture #73: [Don't lose money on AWS](https://www.youtube.com/watch?v=VEf7IpUn6BQ&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE)

## Simple bank service

Expand Down
136 changes: 111 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package main

import (
"context"
"errors"
"net"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
Expand All @@ -23,11 +26,18 @@ import (
"github.com/techschool/simplebank/pb"
"github.com/techschool/simplebank/util"
"github.com/techschool/simplebank/worker"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/encoding/protojson"
)

var interruptSignals = []os.Signal{
os.Interrupt,
syscall.SIGTERM,
syscall.SIGINT,
}

func main() {
config, err := util.LoadConfig(".")
if err != nil {
Expand All @@ -38,7 +48,10 @@ func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}

connPool, err := pgxpool.New(context.Background(), config.DBSource)
ctx, stop := signal.NotifyContext(context.Background(), interruptSignals...)
defer stop()

connPool, err := pgxpool.New(ctx, config.DBSource)
if err != nil {
log.Fatal().Err(err).Msg("cannot connect to db")
}
Expand All @@ -52,9 +65,17 @@ func main() {
}

taskDistributor := worker.NewRedisTaskDistributor(redisOpt)
go runTaskProcessor(config, redisOpt, store)
go runGatewayServer(config, store, taskDistributor)
runGrpcServer(config, store, taskDistributor)

waitGroup, ctx := errgroup.WithContext(ctx)

runTaskProcessor(ctx, waitGroup, config, redisOpt, store)
runGatewayServer(ctx, waitGroup, config, store, taskDistributor)
runGrpcServer(ctx, waitGroup, config, store, taskDistributor)

err = waitGroup.Wait()
if err != nil {
log.Fatal().Err(err).Msg("error from wait group")
}
}

func runDBMigration(migrationURL string, dbSource string) {
Expand All @@ -70,17 +91,40 @@ func runDBMigration(migrationURL string, dbSource string) {
log.Info().Msg("db migrated successfully")
}

func runTaskProcessor(config util.Config, redisOpt asynq.RedisClientOpt, store db.Store) {
func runTaskProcessor(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config,
redisOpt asynq.RedisClientOpt,
store db.Store,
) {
mailer := mail.NewGmailSender(config.EmailSenderName, config.EmailSenderAddress, config.EmailSenderPassword)
taskProcessor := worker.NewRedisTaskProcessor(redisOpt, store, mailer)

log.Info().Msg("start task processor")
err := taskProcessor.Start()
if err != nil {
log.Fatal().Err(err).Msg("failed to start task processor")
}

waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown task processor")

taskProcessor.Shutdown()
log.Info().Msg("task processor is stopped")

return nil
})
}

func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) {
func runGrpcServer(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config,
store db.Store,
taskDistributor worker.TaskDistributor,
) {
server, err := gapi.NewServer(config, store, taskDistributor)
if err != nil {
log.Fatal().Err(err).Msg("cannot create server")
Expand All @@ -96,14 +140,39 @@ func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.Ta
log.Fatal().Err(err).Msg("cannot create listener")
}

log.Info().Msgf("start gRPC server at %s", listener.Addr().String())
err = grpcServer.Serve(listener)
if err != nil {
log.Fatal().Err(err).Msg("cannot start gRPC server")
}
waitGroup.Go(func() error {
log.Info().Msgf("start gRPC server at %s", listener.Addr().String())

err = grpcServer.Serve(listener)
if err != nil {
if errors.Is(err, grpc.ErrServerStopped) {
return nil
}
log.Error().Err(err).Msg("gRPC server failed to serve")
return err
}

return nil
})

waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown gRPC server")

grpcServer.GracefulStop()
log.Info().Msg("gRPC server is stopped")

return nil
})
}

func runGatewayServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) {
func runGatewayServer(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config,
store db.Store,
taskDistributor worker.TaskDistributor,
) {
server, err := gapi.NewServer(config, store, taskDistributor)
if err != nil {
log.Fatal().Err(err).Msg("cannot create server")
Expand All @@ -120,9 +189,6 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker

grpcMux := runtime.NewServeMux(jsonOption)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = pb.RegisterSimpleBankHandlerServer(ctx, grpcMux, server)
if err != nil {
log.Fatal().Err(err).Msg("cannot register handler server")
Expand All @@ -139,17 +205,37 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker
swaggerHandler := http.StripPrefix("/swagger/", http.FileServer(statikFS))
mux.Handle("/swagger/", swaggerHandler)

listener, err := net.Listen("tcp", config.HTTPServerAddress)
if err != nil {
log.Fatal().Err(err).Msg("cannot create listener")
}
httpServer := &http.Server{
Handler: gapi.HttpLogger(mux),
Addr: config.HTTPServerAddress,
}

waitGroup.Go(func() error {
log.Info().Msgf("start HTTP gateway server at %s", httpServer.Addr)
err = httpServer.ListenAndServe()
if err != nil {
if errors.Is(err, http.ErrServerClosed) {
return nil
}
log.Error().Err(err).Msg("HTTP gateway server failed to serve")
return err
}
return nil
})

log.Info().Msgf("start HTTP gateway server at %s", listener.Addr().String())
handler := gapi.HttpLogger(mux)
err = http.Serve(listener, handler)
if err != nil {
log.Fatal().Err(err).Msg("cannot start HTTP gateway server")
}
waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown HTTP gateway server")

err := httpServer.Shutdown(context.Background())
if err != nil {
log.Error().Err(err).Msg("failed to shutdown HTTP gateway server")
return err
}

log.Info().Msg("HTTP gateway server is stopped")
return nil
})
}

func runGinServer(config util.Config, store db.Store) {
Expand Down
5 changes: 5 additions & 0 deletions worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (

type TaskProcessor interface {
Start() error
Shutdown()
ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error
}

Expand Down Expand Up @@ -59,3 +60,7 @@ func (processor *RedisTaskProcessor) Start() error {

return processor.server.Start(mux)
}

func (processor *RedisTaskProcessor) Shutdown() {
processor.server.Shutdown()
}

0 comments on commit 3a4535f

Please sign in to comment.