Skip to content

Commit

Permalink
Fix stall in plugin manager shutdown. Closes #3817
Browse files Browse the repository at this point in the history
  • Loading branch information
binaek authored Sep 8, 2023
1 parent 09898c7 commit 862fcf1
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 53 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ jobs:
with:
version: v1.52.2
args: --timeout=15m --config=.golangci.yml
skip-pkg-cache: true
skip-build-cache: true

- name: Run CLI Unit Tests
run: |
Expand Down Expand Up @@ -220,7 +222,6 @@ jobs:
exit 1
fi
clean_up:
# let's clean up the artifacts.
# incase this step isn't reached,
Expand Down
22 changes: 19 additions & 3 deletions pkg/connection/connection_state_table_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package connection

import (
"context"
"log"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/turbot/steampipe/pkg/connection_state"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"log"
)

type connectionStateTableUpdater struct {
Expand All @@ -18,6 +19,9 @@ type connectionStateTableUpdater struct {
}

func newConnectionStateTableUpdater(updates *steampipeconfig.ConnectionUpdates, pool *pgxpool.Pool) *connectionStateTableUpdater {
log.Println("[DEBUG] newConnectionStateTableUpdater start")
defer log.Println("[DEBUG] newConnectionStateTableUpdater end")

return &connectionStateTableUpdater{
updates: updates,
pool: pool,
Expand All @@ -26,7 +30,8 @@ func newConnectionStateTableUpdater(updates *steampipeconfig.ConnectionUpdates,

// update connection state table to indicate the updates that will be done
func (u *connectionStateTableUpdater) start(ctx context.Context) error {
log.Printf("[INFO] connectionStateTableUpdater start - update connection_state with intended states")
log.Println("[DEBUG] connectionStateTableUpdater.start start")
defer log.Println("[DEBUG] connectionStateTableUpdater.start end")

var queries []db_common.QueryWithArgs

Expand Down Expand Up @@ -69,11 +74,13 @@ func (u *connectionStateTableUpdater) start(ctx context.Context) error {
if _, err = db_local.ExecuteSqlWithArgsInTransaction(ctx, conn.Conn(), queries...); err != nil {
return err
}
log.Printf("[INFO] connectionStateTableUpdater start - finished updating connection_state with intended states")
return nil
}

func (u *connectionStateTableUpdater) onConnectionReady(ctx context.Context, conn *pgx.Conn, name string) error {
log.Println("[DEBUG] connectionStateTableUpdater.onConnectionReady start")
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionReady end")

connection := u.updates.FinalConnectionState[name]
q := connection_state.GetSetConnectionStateSql(connection.ConnectionName, constants.ConnectionStateReady)
_, err := conn.Exec(ctx, q.Query, q.Args...)
Expand All @@ -85,6 +92,9 @@ func (u *connectionStateTableUpdater) onConnectionReady(ctx context.Context, con
}

func (u *connectionStateTableUpdater) onConnectionCommentsLoaded(ctx context.Context, conn *pgx.Conn, name string) error {
log.Println("[DEBUG] connectionStateTableUpdater.onConnectionCommentsLoaded start")
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionCommentsLoaded end")

connection := u.updates.FinalConnectionState[name]
q := connection_state.GetSetConnectionStateCommentLoadedSql(connection.ConnectionName, true)
_, err := conn.Exec(ctx, q.Query, q.Args...)
Expand All @@ -96,6 +106,9 @@ func (u *connectionStateTableUpdater) onConnectionCommentsLoaded(ctx context.Con
}

func (u *connectionStateTableUpdater) onConnectionDeleted(ctx context.Context, conn *pgx.Conn, name string) error {
log.Println("[DEBUG] connectionStateTableUpdater.onConnectionDeleted start")
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionDeleted end")

// if this connection has schema import disabled, DO NOT delete from the conneciotn state table
if _, connectionDisabled := u.updates.Disabled[name]; connectionDisabled {
return nil
Expand All @@ -110,6 +123,9 @@ func (u *connectionStateTableUpdater) onConnectionDeleted(ctx context.Context, c
}

func (u *connectionStateTableUpdater) onConnectionError(ctx context.Context, conn *pgx.Conn, connectionName string, err error) error {
log.Println("[DEBUG] connectionStateTableUpdater.onConnectionError start")
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionError end")

q := connection_state.GetConnectionStateErrorSql(connectionName, err)
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
Expand Down
10 changes: 4 additions & 6 deletions pkg/connection/refresh_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/utils"
)

// only allow one execution of refresh connections
Expand All @@ -18,19 +17,18 @@ var executeLock sync.Mutex
var queueLock sync.Mutex

func RefreshConnections(ctx context.Context, pluginManager pluginManager, forceUpdateConnectionNames ...string) (res *steampipeconfig.RefreshConnectionResult) {
log.Println("[DEBUG] RefreshConnections start")
defer log.Println("[DEBUG] RefreshConnections end")

// TODO KAI if we, for example, access a nil map, this does not seem to catch it and startup hangs
defer func() {
if r := recover(); r != nil {
res = steampipeconfig.NewErrorRefreshConnectionResult(helpers.ToError(r))
}
}()
//time.Sleep(10 * time.Second)
utils.LogTime("RefreshConnections start")
defer utils.LogTime("RefreshConnections end")

t := time.Now()
log.Printf("[INFO] refreshConnections start")
defer log.Printf("[INFO] refreshConnections complete (%fs)", time.Since(t).Seconds())
defer log.Printf("[INFO] refreshConnections completion time (%fs)", time.Since(t).Seconds())

// first grab the queue lock
if !queueLock.TryLock() {
Expand Down
35 changes: 26 additions & 9 deletions pkg/connection/refresh_connections_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type refreshConnectionState struct {
}

func newRefreshConnectionState(ctx context.Context, pluginManager pluginManager, forceUpdateConnectionNames []string) (*refreshConnectionState, error) {
log.Println("[DEBUG] newRefreshConnectionState start")
defer log.Println("[DEBUG] newRefreshConnectionState end")

pool := pluginManager.Pool()
// set user search path first
log.Printf("[INFO] setting up search path")
Expand All @@ -74,6 +77,9 @@ func newRefreshConnectionState(ctx context.Context, pluginManager pluginManager,
// and update the database schema and search path to reflect the required connections
// return whether any changes have been made
func (s *refreshConnectionState) refreshConnections(ctx context.Context) {
log.Println("[DEBUG] refreshConnectionState.refreshConnections start")
defer log.Println("[DEBUG] refreshConnectionState.refreshConnections end")

// if there was an error (other than a connection error, which will NOT have been assigned to res),
// set state of all incomplete connections to error
defer func() {
Expand Down Expand Up @@ -200,13 +206,13 @@ func (s *refreshConnectionState) logRefreshConnectionResults() {
}

func (s *refreshConnectionState) executeConnectionQueries(ctx context.Context) {
log.Println("[DEBUG] refreshConnectionState.executeConnectionQueries start")
defer log.Println("[DEBUG] refreshConnectionState.executeConnectionQueries end")

// TODO WHY? WHY NOT FROM ourselves
// retrieve updates from the table updater
connectionUpdates := s.tableUpdater.updates

utils.LogTime("db.executeConnectionQueries start")
defer utils.LogTime("db.executeConnectionQueries start")

// execute deletions
if err := s.executeDeleteQueries(ctx, maps.Keys(s.connectionUpdates.Delete)); err != nil {
// just log
Expand All @@ -231,16 +237,14 @@ func (s *refreshConnectionState) executeConnectionQueries(ctx context.Context) {
log.Printf("[WARN] failed to send schema deletion Postgres notification: %s", err.Error())
}
}

return
}

// execute all update queries
// NOTE: this only sets res.Error if there is a failure to set update the connection state table
// - all other connection based failures are recorded in the connection state table
func (s *refreshConnectionState) executeUpdateQueries(ctx context.Context) {
utils.LogTime("db.executeUpdateQueries start")
defer utils.LogTime("db.executeUpdateQueries end")
log.Println("[DEBUG] refreshConnectionState.executeUpdateQueries start")
defer log.Println("[DEBUG] refreshConnectionState.executeUpdateQueries end")

defer func() {
if s.res.Error != nil {
Expand Down Expand Up @@ -355,13 +359,15 @@ func updateSetMapToArray(updateSetMap map[string][]*steampipeconfig.ConnectionSt
// create/update connections

func (s *refreshConnectionState) executeUpdatesInParallel(ctx context.Context, updates map[string]*steampipeconfig.ConnectionState) (errors []error) {
// just call executeUpdateSetsInParallel
log.Println("[DEBUG] refreshConnectionState.executeUpdatesInParallel start")
defer log.Println("[DEBUG] refreshConnectionState.executeUpdatesInParallel end")

// convert updates to update sets
updatesAsSets := make(map[string][]*steampipeconfig.ConnectionState, len(updates))
for k, v := range updates {
updatesAsSets[k] = []*steampipeconfig.ConnectionState{v}
}
// just call executeUpdateSetsInParallel
return s.executeUpdateSetsInParallel(ctx, updatesAsSets)
}

Expand All @@ -370,6 +376,9 @@ func (s *refreshConnectionState) executeUpdatesInParallel(ctx context.Context, u
// - for convenience we also use this function for static connections by mapping the input data
// from map[string]*steampipeconfig.ConnectionState to map[string][]*steampipeconfig.ConnectionState
func (s *refreshConnectionState) executeUpdateSetsInParallel(ctx context.Context, updates map[string][]*steampipeconfig.ConnectionState) (errors []error) {
log.Println("[DEBUG] refreshConnectionState.executeUpdateSetsInParallel start")
defer log.Println("[DEBUG] refreshConnectionState.executeUpdateSetsInParallel end")

var wg sync.WaitGroup
var errChan = make(chan *connectionError)

Expand Down Expand Up @@ -438,6 +447,8 @@ func (s *refreshConnectionState) executeUpdateSetsInParallel(ctx context.Context

// syncronously execute the update queries for one or more connections
func (s *refreshConnectionState) executeUpdateForConnections(ctx context.Context, errChan chan *connectionError, cloneSchemaEnabled bool, connectionStates ...*steampipeconfig.ConnectionState) {
log.Println("[DEBUG] refreshConnectionState.executeUpdateForConnections start")
defer log.Println("[DEBUG] refreshConnectionState.executeUpdateForConnections end")

for _, connectionState := range connectionStates {
connectionName := connectionState.ConnectionName
Expand Down Expand Up @@ -470,7 +481,10 @@ func (s *refreshConnectionState) executeUpdateForConnections(ctx context.Context
}
}

func (s *refreshConnectionState) executeUpdateQuery(ctx context.Context, sql, connectionName string) error {
func (s *refreshConnectionState) executeUpdateQuery(ctx context.Context, sql, connectionName string) (err error) {
log.Println("[DEBUG] refreshConnectionState.executeUpdateQuery start")
defer log.Println("[DEBUG] refreshConnectionState.executeUpdateQuery end")

// create a transaction
tx, err := s.pool.Begin(ctx)
if err != nil {
Expand Down Expand Up @@ -764,6 +778,9 @@ func (s *refreshConnectionState) setIncompleteConnectionStateToError(ctx context

// OnConnectionsChanged is the callback function invoked by the connection watcher when connections are added or removed
func (s *refreshConnectionState) sendPostgreSchemaNotification(ctx context.Context) error {
log.Println("[DEBUG] refreshConnectionState.sendPostgreSchemaNotification start")
defer log.Println("[DEBUG] refreshConnectionState.sendPostgreSchemaNotification end")

conn, err := s.pool.Acquire(ctx)
if err != nil {
return err
Expand Down
15 changes: 7 additions & 8 deletions pkg/db/db_local/create_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func CreateLocalDbConnection(ctx context.Context, opts *CreateDbOptions) (*pgx.C
}

// CreateConnectionPool

func CreateConnectionPool(ctx context.Context, opts *CreateDbOptions, maxConnections int) (*pgxpool.Pool, error) {
utils.LogTime("db_client.establishConnectionPool start")
defer utils.LogTime("db_client.establishConnectionPool end")
Expand All @@ -121,7 +120,7 @@ func CreateConnectionPool(ctx context.Context, opts *CreateDbOptions, maxConnect
return nil, err
}

connConfig, err := pgxpool.ParseConfig(psqlInfo)
poolConfig, err := pgxpool.ParseConfig(psqlInfo)
if err != nil {
return nil, err
}
Expand All @@ -131,17 +130,17 @@ func CreateConnectionPool(ctx context.Context, opts *CreateDbOptions, maxConnect
connMaxLifetime = 10 * time.Minute
)

connConfig.MinConns = 0
connConfig.MaxConns = int32(maxConnections)
connConfig.MaxConnLifetime = connMaxLifetime
connConfig.MaxConnIdleTime = connMaxIdleTime
poolConfig.MinConns = 0
poolConfig.MaxConns = int32(maxConnections)
poolConfig.MaxConnLifetime = connMaxLifetime
poolConfig.MaxConnIdleTime = connMaxIdleTime

connConfig.ConnConfig.Config.RuntimeParams = map[string]string{
poolConfig.ConnConfig.Config.RuntimeParams = map[string]string{
constants.RuntimeParamsKeyApplicationName: runtime.ServiceConnectionAppName,
}

// this returns connection pool
dbPool, err := pgxpool.NewWithConfig(context.Background(), connConfig)
dbPool, err := pgxpool.NewWithConfig(context.Background(), poolConfig)
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/db/db_local/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db_local

import (
"context"
"log"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand All @@ -10,6 +11,9 @@ import (
)

func executeSqlAsRoot(ctx context.Context, statements ...string) ([]pgconn.CommandTag, error) {
log.Println("[DEBUG] executeSqlAsRoot start")
defer log.Println("[DEBUG] executeSqlAsRoot end")

rootClient, err := CreateLocalDbConnection(ctx, &CreateDbOptions{Username: constants.DatabaseSuperUser})
if err != nil {
return nil, err
Expand All @@ -18,6 +22,9 @@ func executeSqlAsRoot(ctx context.Context, statements ...string) ([]pgconn.Comma
}

func ExecuteSqlInTransaction(ctx context.Context, conn *pgx.Conn, statements ...string) (results []pgconn.CommandTag, err error) {
log.Println("[DEBUG] ExecuteSqlInTransaction start")
defer log.Println("[DEBUG] ExecuteSqlInTransaction end")

err = pgx.BeginFunc(ctx, conn, func(tx pgx.Tx) error {
for _, statement := range statements {
result, err := tx.Exec(ctx, statement)
Expand All @@ -32,6 +39,9 @@ func ExecuteSqlInTransaction(ctx context.Context, conn *pgx.Conn, statements ...
}

func ExecuteSqlWithArgsInTransaction(ctx context.Context, conn *pgx.Conn, queries ...db_common.QueryWithArgs) (results []pgconn.CommandTag, err error) {
log.Println("[DEBUG] ExecuteSqlWithArgsInTransaction start")
defer log.Println("[DEBUG] ExecuteSqlWithArgsInTransaction end")

err = pgx.BeginFunc(ctx, conn, func(tx pgx.Tx) error {
for _, q := range queries {
result, err := tx.Exec(ctx, q.Query, q.Args...)
Expand Down
10 changes: 8 additions & 2 deletions pkg/pluginmanager/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func start(steampipeExecutablePath string) (*State, error) {

// Stop loads the plugin manager state and if a running instance is found, stop it
func Stop() error {
log.Println("[DEBUG] pluginmanager.Stop start")
defer log.Println("[DEBUG] pluginmanager.Stop end")
// try to load the plugin manager state
state, err := LoadState()
if err != nil {
Expand All @@ -97,7 +99,9 @@ func Stop() error {

// stop the running plugin manager instance
func stop(state *State) error {
log.Printf("[TRACE] plugin manager stop")
log.Println("[DEBUG] pluginmanager.stop start")
defer log.Println("[DEBUG] pluginmanager.stop end")

pluginManager, err := NewPluginManagerClient(state)
if err != nil {
return err
Expand All @@ -109,11 +113,13 @@ func stop(state *State) error {
if err != nil {
return err
}
log.Printf("[TRACE] pluginManager.Shutdown done")

// kill the underlying client
log.Printf("[TRACE] pluginManager.Shutdown killing raw client")
pluginManager.rawClient.Kill()
log.Printf("[TRACE] pluginManager.Shutdown killed raw client")

log.Printf("[TRACE] pluginManager state.kill")
// now kill the plugin manager
return state.kill()
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/pluginmanager/plugin_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func (c *PluginManagerClient) RefreshConnections(req *pb.RefreshConnectionsReque
}

func (c *PluginManagerClient) Shutdown(req *pb.ShutdownRequest) (*pb.ShutdownResponse, error) {
log.Printf("[TRACE] PluginManagerClient Shutdown")
log.Printf("[DEBUG] PluginManagerClient.Shutdown start")
defer log.Printf("[DEBUG] PluginManagerClient.Shutdown done")

res, err := c.manager.Shutdown(req)
if err != nil {
return nil, grpc.HandleGrpcError(err, "PluginManager", "Get")
Expand Down
Loading

0 comments on commit 862fcf1

Please sign in to comment.