diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 874f9f865a..df79659f5c 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -23,8 +23,10 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -const qRepMetadataTableName = "_peerdb_query_replication_metadata" -const QRepOverwriteTempTablePrefix = "_peerdb_overwrite_" +const ( + qRepMetadataTableName = "_peerdb_query_replication_metadata" + QRepOverwriteTempTablePrefix = "_peerdb_overwrite_" +) func (c *PostgresConnector) GetQRepPartitions( ctx context.Context, @@ -654,7 +656,7 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi if constraintsHookExists { c.logger.Info("executing constraints hook", slog.String("procName", fmt.Sprintf("_peerdb_post_run_hook_%s()", config.FlowJobName))) - _, err = tx.Exec(ctx, fmt.Sprintf("CALL _peerdb_post_run_hook_%s()", config.FlowJobName)) + _, err = tx.Exec(ctx, fmt.Sprintf("SELECT _peerdb_post_run_hook_%s()", config.FlowJobName)) if err != nil { return fmt.Errorf("failed to execute stored procedure for applying constraints: %w", err) }