From 3a4535f624aa14299cb4497274ea200d62afbf15 Mon Sep 17 00:00:00 2001 From: Quang Pham Date: Sat, 3 Feb 2024 11:58:11 +0100 Subject: [PATCH] Graceful shutdown servers and workers (#113) * ft: graceful shutdown servers and workers * add new lectures to README file --- README.md | 3 + main.go | 136 ++++++++++++++++++++++++++++++++++++-------- worker/processor.go | 5 ++ 3 files changed, 119 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index b72d952c..2c5b6cda 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,9 @@ This course is designed with a lot of details, so that everyone, even with very - Lecture #68: [Docker compose: port + volume mapping](https://www.youtube.com/watch?v=nJBT5SKENAw&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE) - Lecture #69: [How to install & use binary packages in Go](https://www.youtube.com/watch?v=TnJ4ssoNvkY&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE) - Lecture #70: [Implement role-based access control (RBAC) in Go](https://www.youtube.com/watch?v=Py7dRhtuJ3E&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE) +- Lecture #71: [Grant AWS EKS cluster access to Postgres and Redis using security group](https://www.youtube.com/watch?v=pPXYu6QQGE8&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE) +- Lecture #72: [Deploy gRPC + HTTP server to AWS EKS cluster](https://www.youtube.com/watch?v=Pd7aeh014nU&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE) +- Lecture #73: [Don't lose money on AWS](https://www.youtube.com/watch?v=VEf7IpUn6BQ&list=PLy_6D98if3ULEtXtNSY_2qN21VCKgoQAE) ## Simple bank service diff --git a/main.go b/main.go index eefb0c42..6f111ae8 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,12 @@ package main import ( "context" + "errors" "net" "net/http" "os" + "os/signal" + "syscall" "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" @@ -23,11 +26,18 @@ import ( "github.com/techschool/simplebank/pb" "github.com/techschool/simplebank/util" "github.com/techschool/simplebank/worker" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "google.golang.org/protobuf/encoding/protojson" ) +var interruptSignals = []os.Signal{ + os.Interrupt, + syscall.SIGTERM, + syscall.SIGINT, +} + func main() { config, err := util.LoadConfig(".") if err != nil { @@ -38,7 +48,10 @@ func main() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) } - connPool, err := pgxpool.New(context.Background(), config.DBSource) + ctx, stop := signal.NotifyContext(context.Background(), interruptSignals...) + defer stop() + + connPool, err := pgxpool.New(ctx, config.DBSource) if err != nil { log.Fatal().Err(err).Msg("cannot connect to db") } @@ -52,9 +65,17 @@ func main() { } taskDistributor := worker.NewRedisTaskDistributor(redisOpt) - go runTaskProcessor(config, redisOpt, store) - go runGatewayServer(config, store, taskDistributor) - runGrpcServer(config, store, taskDistributor) + + waitGroup, ctx := errgroup.WithContext(ctx) + + runTaskProcessor(ctx, waitGroup, config, redisOpt, store) + runGatewayServer(ctx, waitGroup, config, store, taskDistributor) + runGrpcServer(ctx, waitGroup, config, store, taskDistributor) + + err = waitGroup.Wait() + if err != nil { + log.Fatal().Err(err).Msg("error from wait group") + } } func runDBMigration(migrationURL string, dbSource string) { @@ -70,17 +91,40 @@ func runDBMigration(migrationURL string, dbSource string) { log.Info().Msg("db migrated successfully") } -func runTaskProcessor(config util.Config, redisOpt asynq.RedisClientOpt, store db.Store) { +func runTaskProcessor( + ctx context.Context, + waitGroup *errgroup.Group, + config util.Config, + redisOpt asynq.RedisClientOpt, + store db.Store, +) { mailer := mail.NewGmailSender(config.EmailSenderName, config.EmailSenderAddress, config.EmailSenderPassword) taskProcessor := worker.NewRedisTaskProcessor(redisOpt, store, mailer) + log.Info().Msg("start task processor") err := taskProcessor.Start() if err != nil { log.Fatal().Err(err).Msg("failed to start task processor") } + + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown task processor") + + taskProcessor.Shutdown() + log.Info().Msg("task processor is stopped") + + return nil + }) } -func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) { +func runGrpcServer( + ctx context.Context, + waitGroup *errgroup.Group, + config util.Config, + store db.Store, + taskDistributor worker.TaskDistributor, +) { server, err := gapi.NewServer(config, store, taskDistributor) if err != nil { log.Fatal().Err(err).Msg("cannot create server") @@ -96,14 +140,39 @@ func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.Ta log.Fatal().Err(err).Msg("cannot create listener") } - log.Info().Msgf("start gRPC server at %s", listener.Addr().String()) - err = grpcServer.Serve(listener) - if err != nil { - log.Fatal().Err(err).Msg("cannot start gRPC server") - } + waitGroup.Go(func() error { + log.Info().Msgf("start gRPC server at %s", listener.Addr().String()) + + err = grpcServer.Serve(listener) + if err != nil { + if errors.Is(err, grpc.ErrServerStopped) { + return nil + } + log.Error().Err(err).Msg("gRPC server failed to serve") + return err + } + + return nil + }) + + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown gRPC server") + + grpcServer.GracefulStop() + log.Info().Msg("gRPC server is stopped") + + return nil + }) } -func runGatewayServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) { +func runGatewayServer( + ctx context.Context, + waitGroup *errgroup.Group, + config util.Config, + store db.Store, + taskDistributor worker.TaskDistributor, +) { server, err := gapi.NewServer(config, store, taskDistributor) if err != nil { log.Fatal().Err(err).Msg("cannot create server") @@ -120,9 +189,6 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker grpcMux := runtime.NewServeMux(jsonOption) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err = pb.RegisterSimpleBankHandlerServer(ctx, grpcMux, server) if err != nil { log.Fatal().Err(err).Msg("cannot register handler server") @@ -139,17 +205,37 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker swaggerHandler := http.StripPrefix("/swagger/", http.FileServer(statikFS)) mux.Handle("/swagger/", swaggerHandler) - listener, err := net.Listen("tcp", config.HTTPServerAddress) - if err != nil { - log.Fatal().Err(err).Msg("cannot create listener") - } + httpServer := &http.Server{ + Handler: gapi.HttpLogger(mux), + Addr: config.HTTPServerAddress, + } + + waitGroup.Go(func() error { + log.Info().Msgf("start HTTP gateway server at %s", httpServer.Addr) + err = httpServer.ListenAndServe() + if err != nil { + if errors.Is(err, http.ErrServerClosed) { + return nil + } + log.Error().Err(err).Msg("HTTP gateway server failed to serve") + return err + } + return nil + }) - log.Info().Msgf("start HTTP gateway server at %s", listener.Addr().String()) - handler := gapi.HttpLogger(mux) - err = http.Serve(listener, handler) - if err != nil { - log.Fatal().Err(err).Msg("cannot start HTTP gateway server") - } + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown HTTP gateway server") + + err := httpServer.Shutdown(context.Background()) + if err != nil { + log.Error().Err(err).Msg("failed to shutdown HTTP gateway server") + return err + } + + log.Info().Msg("HTTP gateway server is stopped") + return nil + }) } func runGinServer(config util.Config, store db.Store) { diff --git a/worker/processor.go b/worker/processor.go index cbf262a1..bdfc1098 100644 --- a/worker/processor.go +++ b/worker/processor.go @@ -17,6 +17,7 @@ const ( type TaskProcessor interface { Start() error + Shutdown() ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error } @@ -59,3 +60,7 @@ func (processor *RedisTaskProcessor) Start() error { return processor.server.Start(mux) } + +func (processor *RedisTaskProcessor) Shutdown() { + processor.server.Shutdown() +}