Skip to content

Commit

Permalink
DEVOPS-30046: improving dbproxy structure and logging for debugging. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
leandrorichardtoledo authored Dec 27, 2024
1 parent 69544a4 commit ec8cfac
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 107 deletions.
10 changes: 7 additions & 3 deletions dbproxy/cmd/dbproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func main() {
flag.Parse()

logger = zap.New(zap.UseFlagOptions(&opts))
dbproxy.SetLogger(logger)
mgr, err := dbproxy.New(context.TODO(), dbproxy.Config{

mgr, err := dbproxy.New(context.TODO(), logger, dbproxy.Config{
DBCredentialPath: dbCredentialPath,
PGCredentialPath: pbCredentialPath,
PGBStartScript: pgbStartScriptPath,
Expand Down Expand Up @@ -94,17 +94,21 @@ func catch(cancel func()) {
logger.Error(err, "failed to convert pid to int")
os.Exit(1)
}
logger.Info("terminating pgbouncer pid", "pid", pid)

// Terminate pgbouncer
logger.Info("terminating pgbouncer pid", "pid", pid)
cmd := exec.Command("sh", "-c", fmt.Sprintf("kill -s 9 %d", pid))

stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
logger.Error(err, "failed to kill pgbouncer")
}
logger.Info("pgbouncer stop executed", "output", stdoutStderr)

// Capture log pgbouncer.log and write to stdout
logger.Info("capturing pgbouncer.log")
cmd = exec.Command("sh", "-c", fmt.Sprintf("cat %s", "pgbouncer.log"))

stdoutStderr, err = cmd.CombinedOutput()
if err != nil {
logger.Error(err, "failed to cat log", "output", string(stdoutStderr))
Expand Down
180 changes: 90 additions & 90 deletions dbproxy/dbproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dbproxy
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
Expand All @@ -12,27 +11,14 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/infobloxopen/db-controller/dbproxy/pgbouncer"
"go.uber.org/zap"
)

var logger logr.Logger

// DebugLevel is used to set V level to 1 as suggested by official docs
// https://github.com/kubernetes-sigs/controller-runtime/blob/main/TMP-LOGGING.md
const debugLevel = 1

func init() {
zapLog, _ := zap.NewDevelopment()
logger = zapr.NewLogger(zapLog)
}

func SetLogger(l logr.Logger) {
logger = l
pgbouncer.SetLogger(l)
}

// Config is the configuration for the dbproxy manager.
type Config struct {
StartUpTime time.Duration
DBCredentialPath string
Expand All @@ -42,81 +28,39 @@ type Config struct {
LocalAddr string
}

func waitForFiles(ctx context.Context, paths ...string) error {
var errs []error
wg := &sync.WaitGroup{}
for _, path := range paths {
wg.Add(1)
go func(path string) {
defer wg.Done()
err := waitForFile(ctx, path)
if err != nil {
errs = append(errs, err)
}
}(path)
}
wg.Wait()
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
// DBProxy is a manager for the dbproxy.
type DBProxy struct {
logger logr.Logger
cfg Config
}

func waitForFile(ctx context.Context, path string) error {
logr := logger.WithValues("path", path)
var sleep time.Duration
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout_wait_for_file: %s", path)
case <-time.After(sleep):
sleep = 5 * time.Second
time.Sleep(time.Second)

file, err := os.Open(path)
if err != nil {
logr.Error(err, "file_not_found")
continue
}

stat, err := file.Stat()
if err != nil {
logr.Error(err, "failed to stat file")
continue
}

if !stat.Mode().IsRegular() {
logr.Info("irregular file")
continue
}
return nil
}
}
}

type mgr struct {
cfg Config
}

func New(ctx context.Context, cfg Config) (*mgr, error) {

flag.Parse()

// New creates a new dbproxy manager.
func New(ctx context.Context, logger logr.Logger, cfg Config) (*DBProxy, error) {
if cfg.StartUpTime == 0 {
cfg.StartUpTime = 30 * time.Second
}

pgbouncer.SetLogger(logger)

waitCtx, cancel := context.WithTimeout(ctx, cfg.StartUpTime)
if err := waitForFiles(waitCtx, cfg.DBCredentialPath); err != nil {
defer cancel()

if err := waitForFiles(waitCtx, logger, cfg.DBCredentialPath); err != nil {
return nil, err
}
cancel()
return &mgr{cfg: cfg}, nil

dbp := DBProxy{
logger: logger,
cfg: cfg,
}

return &dbp, nil
}

func (m *mgr) Start(ctx context.Context) error {
// Start starts the dbproxy manager.
func (dbp *DBProxy) Start(ctx context.Context) error {

cfg := m.cfg
cfg := dbp.cfg

pgbCfg, err := pgbouncer.NewConfig(pgbouncer.Params{
DSNPath: cfg.DBCredentialPath,
Expand All @@ -128,17 +72,17 @@ func (m *mgr) Start(ctx context.Context) error {
}

if err := pgbCfg.Write(); err != nil {
return err
return fmt.Errorf("pgbCfg.Write failed: %w", err)
}

if err := pgbouncer.Start(context.TODO(), cfg.PGBStartScript); err != nil {
return err
return fmt.Errorf("pgbouncer.Start failed: %w", err)
}

// Watch for ongoing changes and regenerate pgbouncer config
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
return fmt.Errorf("fsnotify.NewWatcher failed: %w", err)
}
defer watcher.Close()

Expand All @@ -150,50 +94,54 @@ func (m *mgr) Start(ctx context.Context) error {
select {
case <-ctx.Done():
done <- nil
dbp.logger.Info("context done")
return
case event, ok := <-watcher.Events:
if !ok {
return
}
logger.V(debugLevel).Info("received_update", "event", event)

dbp.logger.Info("received_update", "event", event)

// FIXME: check if file change is one we actually care about
err := watcher.Remove(cfg.DBCredentialPath)
if err != nil {
dbp.logger.Error(err, "remove_failed", "credential_path", cfg.DBCredentialPath)
if !errors.Is(err, fsnotify.ErrNonExistentWatch) {
log.Fatal("Remove failed:", err)
}
}

waitCtx, cancel := context.WithTimeout(ctx, cfg.StartUpTime)
if err := waitForFiles(waitCtx, cfg.DBCredentialPath); err != nil {
logger.Error(err, "waitForFiles failed")
if err := waitForFiles(waitCtx, dbp.logger, cfg.DBCredentialPath); err != nil {
dbp.logger.Error(err, "waitForFiles failed", "credential_path", cfg.DBCredentialPath)
}
cancel()

err = watcher.Add(cfg.DBCredentialPath)
if err != nil {
logger.Error(err, "add failed")
dbp.logger.Error(err, "add_failed", "credential_path", cfg.DBCredentialPath)
os.Exit(1)
}

err = pgbCfg.Write()
if errors.Is(err, pgbouncer.ErrDuplicateWrite) {
logger.V(debugLevel).Info("ignoring duplicate write")
dbp.logger.Error(err, "ignoring_duplicate_write")
continue
} else if err != nil {
log.Println("parseWritePGBConfig failed:", err)
dbp.logger.Error(err, "parseWritePGBConfig failed")
continue
}

logger.V(debugLevel).Info("reload_pgbouncer")
dbp.logger.Info("reloading_pgbouncer")
if err := pgbouncer.Reload(context.TODO(), cfg.PGBReloadScript); err != nil {
logger.Error(err, "reload_pgbouncer_error")
dbp.logger.Error(err, "pgbouncer.Reload failed")
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
logger.Error(err, "watcher_err")
dbp.logger.Error(err, "watcher_error")
}
}

Expand All @@ -205,3 +153,55 @@ func (m *mgr) Start(ctx context.Context) error {
}
return <-done
}

func waitForFiles(ctx context.Context, logger logr.Logger, paths ...string) error {
var errs []error
wg := &sync.WaitGroup{}
for _, path := range paths {
wg.Add(1)
go func(path string) {
defer wg.Done()
err := waitForFile(ctx, logger, path)
if err != nil {
errs = append(errs, err)
}
}(path)
}
wg.Wait()
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

func waitForFile(ctx context.Context, logger logr.Logger, path string) error {
logr := logger.WithValues("method", "waitForFile", "path", path)
var sleep time.Duration
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout_wait_for_file: %s", path)
case <-time.After(sleep):
sleep = 5 * time.Second
time.Sleep(time.Second)

file, err := os.Open(path)
if err != nil {
logr.Error(err, "file_not_found")
continue
}

stat, err := file.Stat()
if err != nil {
logr.Error(err, "failed to stat file")
continue
}

if !stat.Mode().IsRegular() {
logr.Info("irregular file")
continue
}
return nil
}
}
}
16 changes: 13 additions & 3 deletions dbproxy/dbproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,31 @@ import (
"path/filepath"
"testing"
"time"

"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func TestE2E(t *testing.T) {

// Context tracks how long we'll wait to find credential files
// Create dummy logger for testing.
opts := zap.Options{
Development: true,
}
logger := zap.New(zap.UseFlagOptions(&opts))

// Context tracks how long we'll wait to find credential files.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mgr, err := New(ctx, Config{

dbpConfig := Config{
DBCredentialPath: testDSNURIPath,
PGCredentialPath: filepath.Join(tempDir, "pgbouncer.ini"),
PGBStartScript: "scripts/mock-start-pgbouncer.sh",
PGBReloadScript: "scripts/mock-start-pgbouncer.sh",
LocalAddr: "0.0.0.0:5432",
})
}

mgr, err := New(ctx, logger, dbpConfig)
if err != nil {
t.Fatal(err)
}
Expand Down
13 changes: 11 additions & 2 deletions dbproxy/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"testing"
"time"

"github.com/go-logr/logr"

"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/lib/pq"
)

Expand All @@ -25,14 +29,19 @@ var (
)

func TestMain(m *testing.M) {
opts := zap.Options{
Development: true,
}
logger := zap.New(zap.UseFlagOptions(&opts))

var err error
tempDir, err = os.MkdirTemp("", "dbproxy")
if err != nil {
panic(err)
}

var cleanupTestDB func()
testdb, testDSN, cleanupTestDB = Run(RunConfig{
testdb, testDSN, cleanupTestDB = Run(logger, RunConfig{
Database: "postgres",
Username: "postgres",
Password: "postgres",
Expand Down Expand Up @@ -106,7 +115,7 @@ type RunConfig struct {

// Run a PostgreSQL database in a Docker container and return a connection to it.
// The caller is responsible for calling the func() to prevent leaking containers.
func Run(cfg RunConfig) (*sql.DB, string, func()) {
func Run(logger logr.Logger, cfg RunConfig) (*sql.DB, string, func()) {
port := getEphemeralPort()

// Required parameters
Expand Down
Loading

0 comments on commit ec8cfac

Please sign in to comment.