diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 9fe72828ad..008b462ae4 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -2,7 +2,6 @@ package connmetadata import ( "context" - "errors" "fmt" "log/slog" @@ -10,8 +9,6 @@ import ( cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" ) @@ -20,11 +17,6 @@ const ( lastSyncStateTableName = "last_sync_state" ) -func isUniqueError(err error) bool { - var pgerr *pgconn.PgError - return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.UniqueViolation -} - type PostgresMetadataStore struct { ctx context.Context config *protos.PostgresConfig @@ -114,7 +106,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { // create the schema _, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) - if err != nil && !isUniqueError(err) { + if err != nil && !utils.IsUniqueError(err) { p.logger.Error("failed to create schema", slog.Any("error", err)) return err } @@ -128,7 +120,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { sync_batch_id BIGINT NOT NULL ) `) - if err != nil && !isUniqueError(err) { + if err != nil && !utils.IsUniqueError(err) { p.logger.Error("failed to create last sync state table", slog.Any("error", err)) return err } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index c8720aca26..52eb0d77d1 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -374,7 +374,7 @@ func (c *PostgresConnector) createSlotAndPublication( func (c *PostgresConnector) createMetadataSchema(createSchemaTx pgx.Tx) error { _, err := createSchemaTx.Exec(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) - if err != nil { + if err != nil && !utils.IsUniqueError(err) { return fmt.Errorf("error while creating internal schema: %w", err) } return nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ad85126fea..127d30d491 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -158,7 +158,7 @@ func (c *PostgresConnector) SetupMetadataTables() error { } _, err = createMetadataTablesTx.Exec(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL, c.metadataSchema, mirrorJobsTableIdentifier)) - if err != nil { + if err != nil && !utils.IsUniqueError(err) { return fmt.Errorf("error creating table %s: %w", mirrorJobsTableIdentifier, err) } diff --git a/flow/connectors/utils/postgres.go b/flow/connectors/utils/postgres.go index 58cd02f205..b043698943 100644 --- a/flow/connectors/utils/postgres.go +++ b/flow/connectors/utils/postgres.go @@ -2,14 +2,22 @@ package utils import ( "context" + "errors" "fmt" "net/url" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/jackc/pgerrcode" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" ) +func IsUniqueError(err error) bool { + var pgerr *pgconn.PgError + return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.UniqueViolation +} + func GetPGConnectionString(pgConfig *protos.PostgresConfig) string { passwordEscaped := url.QueryEscape(pgConfig.Password) // for a url like postgres://user:password@host:port/dbname