Skip to content

Commit

Permalink
Remove unnecessary transactions from ConnectionStateUpdater. Closes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre authored Sep 28, 2023
1 parent 6407ff5 commit 61718df
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
25 changes: 17 additions & 8 deletions pkg/connection/connection_state_table_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ func (u *connectionStateTableUpdater) onConnectionReady(ctx context.Context, con

connection := u.updates.FinalConnectionState[name]
queries := introspection.GetSetConnectionStateSql(connection.ConnectionName, constants.ConnectionStateReady)
if _, err := db_local.ExecuteSqlWithArgsInTransaction(ctx, conn, queries...); err != nil {
return err
for _, q := range queries {
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
}
}
return nil
}
Expand All @@ -95,8 +97,10 @@ func (u *connectionStateTableUpdater) onConnectionCommentsLoaded(ctx context.Con

connection := u.updates.FinalConnectionState[name]
queries := introspection.GetSetConnectionStateCommentLoadedSql(connection.ConnectionName, true)
if _, err := db_local.ExecuteSqlWithArgsInTransaction(ctx, conn, queries...); err != nil {
return err
for _, q := range queries {
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
}
}
return nil
}
Expand All @@ -110,8 +114,10 @@ func (u *connectionStateTableUpdater) onConnectionDeleted(ctx context.Context, c
return nil
}
queries := introspection.GetDeleteConnectionStateSql(name)
if _, err := db_local.ExecuteSqlWithArgsInTransaction(ctx, conn, queries...); err != nil {
return err
for _, q := range queries {
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
}
}
return nil
}
Expand All @@ -121,8 +127,11 @@ func (u *connectionStateTableUpdater) onConnectionError(ctx context.Context, con
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionError end")

queries := introspection.GetConnectionStateErrorSql(connectionName, err)
if _, err := db_local.ExecuteSqlWithArgsInTransaction(ctx, conn, queries...); err != nil {
return err
for _, q := range queries {
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
}
}

return nil
}
26 changes: 14 additions & 12 deletions pkg/connection/refresh_connections_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *refreshConnectionState) refreshConnections(ctx context.Context) {

// NOTE: delete any DYNAMIC plugin connections which will be updated
// to avoid them being accessed before they are updated
// TODO sure we can remove this
s.executeDeleteQueries(ctx, s.connectionUpdates.DynamicUpdates())

// update connectionState table to reflect the updates (i.e. set connections to updating/deleting/ready as appropriate)
Expand Down Expand Up @@ -220,25 +221,25 @@ func (s *refreshConnectionState) executeConnectionQueries(ctx context.Context) {
log.Println("[DEBUG] refreshConnectionState.executeConnectionQueries start")
defer log.Println("[DEBUG] refreshConnectionState.executeConnectionQueries end")

// TODO WHY? WHY NOT FROM ourselves
// retrieve updates from the table updater
connectionUpdates := s.tableUpdater.updates

// execute deletions
if err := s.executeDeleteQueries(ctx, s.connectionUpdates.GetConnectionsToDelete()); err != nil {
// just log
log.Printf("[WARN] failed to delete all unused schemas: %s", err.Error())
}

// execute updates
numUpdates := len(connectionUpdates.Update)
numMissingComments := len(connectionUpdates.MissingComments)
numUpdates := len(s.connectionUpdates.Update)
numMissingComments := len(s.connectionUpdates.MissingComments)
log.Printf("[INFO] executeConnectionQueries: num updates: %d, connections missing comments: %d", numUpdates, numMissingComments)

if numUpdates+numMissingComments > 0 {
// get schema queries - this updates schemas for validated plugins and drops schemas for unvalidated plugins
s.executeUpdateQueries(ctx)
} else if len(connectionUpdates.Delete) > 0 {
// done
return
}

if len(s.connectionUpdates.Delete) > 0 {
log.Printf("[INFO] deleted all unnecessary schemas - sending notification")

// if there are no updates and there ARE deletes, notify
Expand Down Expand Up @@ -286,7 +287,7 @@ func (s *refreshConnectionState) executeUpdateQueries(ctx context.Context) {
moreErrors := s.executeUpdatesInParallel(ctx, initialUpdates)
errors = append(errors, moreErrors...)

// execute dynamic updates (not, we update all connections in search path order,
// execute dynamic updates (note, we update all connections in search path order,
// so must call executeUpdateSetsInParallel)
log.Printf("[INFO] executing dynamic updates")
moreErrors = s.executeUpdateSetsInParallel(ctx, dynamicUpdates)
Expand All @@ -313,7 +314,6 @@ func (s *refreshConnectionState) executeUpdateQueries(ctx context.Context) {
log.Printf("[INFO] updated all exemplar schemas - sending notification")
// now that we have updated all exemplar schemars, send postgres notification
// this gives any attached interactive clients a chance to update their inspect data and autocomplete

if err := s.pluginManager.SendPostgresSchemaNotification(ctx); err != nil {
// just log
log.Printf("[WARN] failed to send schem update Postgres notification: %s", err.Error())
Expand Down Expand Up @@ -381,7 +381,7 @@ func (s *refreshConnectionState) executeUpdatesInParallel(ctx context.Context, u
return s.executeUpdateSetsInParallel(ctx, updatesAsSets)
}

// execute sets of updates in parallel - this is required as for dynamic plugins, we must updated all connections in
// execute sets of updates in parallel - this is required as for dynamic plugins, we must update all connections in
// search path order
// - for convenience we also use this function for static connections by mapping the input data
// from map[string]*steampipeconfig.ConnectionState to map[string][]*steampipeconfig.ConnectionState
Expand Down Expand Up @@ -690,15 +690,17 @@ func (s *refreshConnectionState) getInitialAndRemainingUpdates() (initialUpdates
for _, connectionName := range searchPathConnections {
if connectionState, updateRequired := updates[connectionName]; updateRequired {
if connectionState.SchemaMode == plugin.SchemaModeDynamic {
dynamicUpdates[connectionState.Plugin] = append(dynamicUpdates[connectionState.Plugin], connectionState)
pluginInstance := *connectionState.PluginInstance
dynamicUpdates[pluginInstance] = append(dynamicUpdates[pluginInstance], connectionState)
} else {
initialUpdates[connectionName] = connectionState
}
}
}
// now add remaining updates to remainingUpdates
for connectionName, connectionState := range updates {
if _, isInitialUpdate := initialUpdates[connectionName]; !isInitialUpdate {
_, isInitialUpdate := initialUpdates[connectionName]
if connectionState.SchemaMode == plugin.SchemaModeStatic && !isInitialUpdate {
remainingUpdates[connectionName] = connectionState
}

Expand Down

0 comments on commit 61718df

Please sign in to comment.