From 2b8a575fc7a7706d80f52ab5f6516f2f910a83d9 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Fri, 22 Mar 2024 01:40:10 +0530 Subject: [PATCH] Refactor peer validate check functions (#1521) Validation functions for most of our peers were inside NewSnowflakeConnector() , NewBigQueryConnector() and so on. We instantiate new connectors often during mirrors, where it isn't necessary to validate again and again. This PR introduces a new method for Connector interface - `ValidateCheck()` which does the validation and this is called only in validate peer. The validation which was done in NewConnector() is now done here. Functionally tested for all peers where validation is there: - Snowflake - BigQuery - Clickhouse - S3 --- flow/cmd/validate_peer.go | 12 ++++++++++++ flow/connectors/bigquery/bigquery.go | 18 ++++++------------ flow/connectors/clickhouse/clickhouse.go | 23 +++++++++-------------- flow/connectors/core.go | 13 +++++++++++++ flow/connectors/s3/s3.go | 18 ++++++------------ flow/connectors/snowflake/snowflake.go | 16 ++++++---------- 6 files changed, 52 insertions(+), 48 deletions(-) diff --git a/flow/cmd/validate_peer.go b/flow/cmd/validate_peer.go index 5bbc1cfb2c..6a77fbe5ca 100644 --- a/flow/cmd/validate_peer.go +++ b/flow/cmd/validate_peer.go @@ -55,6 +55,18 @@ func (h *FlowRequestHandler) ValidatePeer( } } + validationConn, ok := conn.(connectors.ValidationConnector) + if ok { + validErr := validationConn.ValidateCheck(ctx) + if validErr != nil { + return &protos.ValidatePeerResponse{ + Status: protos.ValidatePeerStatus_INVALID, + Message: fmt.Sprintf("failed to validate %s peer %s: %v", + req.Peer.Type, req.Peer.Name, validErr), + }, nil + } + } + connErr := conn.ConnectionActive(ctx) if connErr != nil { return &protos.ValidatePeerResponse{ diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5b2b8b01f8..db56cfd9fc 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -131,14 +131,14 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s return client, nil } -// TableCheck: +// ValidateCheck: // 1. Creates a table // 2. Inserts one row into the table // 3. Deletes the table -func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, project string) error { +func (c *BigQueryConnector) ValidateCheck(ctx context.Context) error { dummyTable := "peerdb_validate_dummy_" + shared.RandomString(4) - newTable := client.DatasetInProject(project, dataset).Table(dummyTable) + newTable := c.client.DatasetInProject(c.projectID, c.datasetID).Table(dummyTable) createErr := newTable.Create(ctx, &bigquery.TableMetadata{ Schema: []*bigquery.FieldSchema{ @@ -155,9 +155,9 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, pr } var errs []error - insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable)) - insertQuery.DefaultDatasetID = dataset - insertQuery.DefaultProjectID = project + insertQuery := c.client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable)) + insertQuery.DefaultDatasetID = c.datasetID + insertQuery.DefaultProjectID = c.projectID _, insertErr := insertQuery.Run(ctx) if insertErr != nil { errs = append(errs, fmt.Errorf("unable to validate insertion into table: %w. ", insertErr)) @@ -207,12 +207,6 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr) } - permissionErr := TableCheck(ctx, client, datasetID, projectID) - if permissionErr != nil { - logger.Error("failed to get run mock table check", "error", permissionErr) - return nil, permissionErr - } - storageClient, err := bqsa.CreateStorageClient(ctx) if err != nil { return nil, fmt.Errorf("failed to create Storage client: %v", err) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 2390ad1109..a4e959b505 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -56,27 +56,32 @@ func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error } // Creates and drops a dummy table to validate the peer -func ValidateClickhouse(ctx context.Context, conn *sql.DB) error { +func (c *ClickhouseConnector) ValidateCheck(ctx context.Context) error { validateDummyTableName := "peerdb_validation_" + shared.RandomString(4) // create a table - _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory", + _, err := c.database.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory", validateDummyTableName)) if err != nil { return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err) } // insert a row - _, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName)) + _, err = c.database.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName)) if err != nil { return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err) } // drop the table - _, err = conn.ExecContext(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName) + _, err = c.database.ExecContext(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName) if err != nil { return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err) } + validateErr := ValidateS3(ctx, c.creds) + if validateErr != nil { + return fmt.Errorf("failed to validate S3 bucket: %w", validateErr) + } + return nil } @@ -90,11 +95,6 @@ func NewClickhouseConnector( return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } - err = ValidateClickhouse(ctx, database) - if err != nil { - return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err) - } - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { logger.Error("failed to create postgres metadata store", "error", err) @@ -122,11 +122,6 @@ func NewClickhouseConnector( clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) } - validateErr := ValidateS3(ctx, clickhouseS3Creds) - if validateErr != nil { - return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr) - } - return &ClickhouseConnector{ database: database, pgMetadata: pgMetadata, diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 988b8e3f28..39e31f8171 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -27,6 +27,14 @@ type Connector interface { ConnectionActive(context.Context) error } +type ValidationConnector interface { + Connector + + // ValidationCheck performs validation for the connectors, + // usually includes permissions to create and use objects (tables, schema etc). + ValidateCheck(context.Context) error +} + type GetTableSchemaConnector interface { Connector @@ -279,4 +287,9 @@ var ( _ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{} _ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{} + + _ ValidationConnector = &connsnowflake.SnowflakeConnector{} + _ ValidationConnector = &connclickhouse.ClickhouseConnector{} + _ ValidationConnector = &connbigquery.BigQueryConnector{} + _ ValidationConnector = &conns3.S3Connector{} ) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index de24f7e090..ddb6061b0d 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -89,10 +89,10 @@ func (c *S3Connector) Close() error { return nil } -func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error { +func (c *S3Connector) ValidateCheck(ctx context.Context) error { reader := strings.NewReader(time.Now().Format(time.RFC3339)) - bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL) + bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(c.url) if parseErr != nil { return fmt.Errorf("failed to parse bucket url: %w", parseErr) } @@ -100,7 +100,7 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta // Write an empty file and then delete it // to check if we have write permissions bucketName := aws.String(bucketPrefix.Bucket) - _, putErr := s3Client.PutObject(ctx, &s3.PutObjectInput{ + _, putErr := c.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: bucketName, Key: aws.String(_peerDBCheck), Body: reader, @@ -109,7 +109,7 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta return fmt.Errorf("failed to write to bucket: %w", putErr) } - _, delErr := s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ + _, delErr := c.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: bucketName, Key: aws.String(_peerDBCheck), }) @@ -118,8 +118,8 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta } // check if we can ping external metadata - if metadataDB != nil { - err := metadataDB.Ping(ctx) + if c.pgMetadata != nil { + err := c.pgMetadata.Ping(ctx) if err != nil { return fmt.Errorf("failed to ping external metadata: %w", err) } @@ -129,12 +129,6 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta } func (c *S3Connector) ConnectionActive(ctx context.Context) error { - validErr := ValidCheck(ctx, &c.client, c.url, c.pgMetadata) - if validErr != nil { - c.logger.Error("failed to validate s3 connector:", "error", validErr) - return validErr - } - return nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index b1877320e1..f67fe59df0 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -104,10 +104,11 @@ type UnchangedToastColumnResult struct { UnchangedToastColumns ArrayString } -func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) error { +func (c *SnowflakeConnector) ValidateCheck(ctx context.Context) error { + schemaName := c.rawSchema // check if schema exists var schemaExists sql.NullBool - err := database.QueryRowContext(ctx, checkIfSchemaExistsSQL, schemaName).Scan(&schemaExists) + err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, schemaName).Scan(&schemaExists) if err != nil { return fmt.Errorf("error while checking if schema exists: %w", err) } @@ -116,9 +117,9 @@ func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) e // In a transaction, create a table, insert a row into the table and then drop the table // If any of these steps fail, the transaction will be rolled back - tx, err := database.BeginTx(ctx, nil) + tx, err := c.database.BeginTx(ctx, nil) if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to begin transaction for table check: %w", err) } // in case we return after error, ensure transaction is rolled back defer func() { @@ -158,7 +159,7 @@ func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) e // commit transaction err = tx.Commit() if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) + return fmt.Errorf("failed to commit transaction for table check: %w", err) } return nil @@ -212,11 +213,6 @@ func NewSnowflakeConnector( rawSchema = *snowflakeProtoConfig.MetadataSchema } - err = ValidationCheck(ctx, database, rawSchema) - if err != nil { - return nil, fmt.Errorf("could not validate snowflake peer: %w", err) - } - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { return nil, fmt.Errorf("could not connect to metadata store: %w", err)