From 8583d62b65b59e547f5651b06c0cf0f95358420a Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 12 Mar 2024 20:15:28 +0530 Subject: [PATCH] setting timeouts to 0 for consolidate txn (#1474) --- flow/connectors/postgres/qrep.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index df79659f5c..2dc1a5bd09 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -599,6 +599,11 @@ func (c *PostgresConnector) isPartitionSynced(ctx context.Context, partitionID s } func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error { + if config.SourcePeer.Type != protos.DBType_SNOWFLAKE || + config.WriteMode.WriteType != protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { + return nil + } + destinationTables := strings.Split(config.DestinationTableIdentifier, ";") constraintsHookExists := true @@ -621,6 +626,19 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi } }() + _, err = tx.Exec(ctx, "SET statement_timeout=0") + if err != nil { + return fmt.Errorf("failed to set statement_timeout: %w", err) + } + _, err = tx.Exec(ctx, "SET idle_in_transaction_session_timeout=0") + if err != nil { + return fmt.Errorf("failed to set idle_in_transaction_session_timeout: %w", err) + } + _, err = tx.Exec(ctx, "SET lock_timeout=0") + if err != nil { + return fmt.Errorf("failed to set lock_timeout: %w", err) + } + for _, dstTableName := range destinationTables { dstSchemaTable, err := utils.ParseSchemaTable(dstTableName) if err != nil {