Skip to content

Commit

Permalink
Refactor peer validate check functions (#1521)
Browse files Browse the repository at this point in the history
Validation functions for most of our peers were inside
NewSnowflakeConnector() , NewBigQueryConnector() and so on.
We instantiate new connectors often during mirrors, where it isn't
necessary to validate again and again.

This PR introduces a new method for Connector interface -
`ValidateCheck()` which does the validation and this is called only in
validate peer. The validation which was done in NewConnector() is now
done here.

Functionally tested for all peers where validation is there:
- Snowflake
- BigQuery
- Clickhouse
- S3
  • Loading branch information
Amogh-Bharadwaj authored Mar 21, 2024
1 parent 15c1a68 commit 2b8a575
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 48 deletions.
12 changes: 12 additions & 0 deletions flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ func (h *FlowRequestHandler) ValidatePeer(
}
}

validationConn, ok := conn.(connectors.ValidationConnector)
if ok {
validErr := validationConn.ValidateCheck(ctx)
if validErr != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to validate %s peer %s: %v",
req.Peer.Type, req.Peer.Name, validErr),
}, nil
}
}

connErr := conn.ConnectionActive(ctx)
if connErr != nil {
return &protos.ValidatePeerResponse{
Expand Down
18 changes: 6 additions & 12 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s
return client, nil
}

// TableCheck:
// ValidateCheck:
// 1. Creates a table
// 2. Inserts one row into the table
// 3. Deletes the table
func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, project string) error {
func (c *BigQueryConnector) ValidateCheck(ctx context.Context) error {
dummyTable := "peerdb_validate_dummy_" + shared.RandomString(4)

newTable := client.DatasetInProject(project, dataset).Table(dummyTable)
newTable := c.client.DatasetInProject(c.projectID, c.datasetID).Table(dummyTable)

createErr := newTable.Create(ctx, &bigquery.TableMetadata{
Schema: []*bigquery.FieldSchema{
Expand All @@ -155,9 +155,9 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, pr
}

var errs []error
insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable))
insertQuery.DefaultDatasetID = dataset
insertQuery.DefaultProjectID = project
insertQuery := c.client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable))
insertQuery.DefaultDatasetID = c.datasetID
insertQuery.DefaultProjectID = c.projectID
_, insertErr := insertQuery.Run(ctx)
if insertErr != nil {
errs = append(errs, fmt.Errorf("unable to validate insertion into table: %w. ", insertErr))
Expand Down Expand Up @@ -207,12 +207,6 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr)
}

permissionErr := TableCheck(ctx, client, datasetID, projectID)
if permissionErr != nil {
logger.Error("failed to get run mock table check", "error", permissionErr)
return nil, permissionErr
}

storageClient, err := bqsa.CreateStorageClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create Storage client: %v", err)
Expand Down
23 changes: 9 additions & 14 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,32 @@ func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error
}

// Creates and drops a dummy table to validate the peer
func ValidateClickhouse(ctx context.Context, conn *sql.DB) error {
func (c *ClickhouseConnector) ValidateCheck(ctx context.Context) error {
validateDummyTableName := "peerdb_validation_" + shared.RandomString(4)
// create a table
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory",
_, err := c.database.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory",
validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err)
}

// insert a row
_, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName))
_, err = c.database.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err)
}

// drop the table
_, err = conn.ExecContext(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName)
_, err = c.database.ExecContext(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName)
if err != nil {
return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err)
}

validateErr := ValidateS3(ctx, c.creds)
if validateErr != nil {
return fmt.Errorf("failed to validate S3 bucket: %w", validateErr)
}

return nil
}

Expand All @@ -90,11 +95,6 @@ func NewClickhouseConnector(
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

err = ValidateClickhouse(ctx, database)
if err != nil {
return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err)
}

pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
logger.Error("failed to create postgres metadata store", "error", err)
Expand Down Expand Up @@ -122,11 +122,6 @@ func NewClickhouseConnector(
clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix)
}

validateErr := ValidateS3(ctx, clickhouseS3Creds)
if validateErr != nil {
return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr)
}

return &ClickhouseConnector{
database: database,
pgMetadata: pgMetadata,
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ type Connector interface {
ConnectionActive(context.Context) error
}

type ValidationConnector interface {
Connector

// ValidationCheck performs validation for the connectors,
// usually includes permissions to create and use objects (tables, schema etc).
ValidateCheck(context.Context) error
}

type GetTableSchemaConnector interface {
Connector

Expand Down Expand Up @@ -279,4 +287,9 @@ var (

_ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{}
_ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{}

_ ValidationConnector = &connsnowflake.SnowflakeConnector{}
_ ValidationConnector = &connclickhouse.ClickhouseConnector{}
_ ValidationConnector = &connbigquery.BigQueryConnector{}
_ ValidationConnector = &conns3.S3Connector{}
)
18 changes: 6 additions & 12 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ func (c *S3Connector) Close() error {
return nil
}

func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error {
func (c *S3Connector) ValidateCheck(ctx context.Context) error {
reader := strings.NewReader(time.Now().Format(time.RFC3339))

bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL)
bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(c.url)
if parseErr != nil {
return fmt.Errorf("failed to parse bucket url: %w", parseErr)
}

// Write an empty file and then delete it
// to check if we have write permissions
bucketName := aws.String(bucketPrefix.Bucket)
_, putErr := s3Client.PutObject(ctx, &s3.PutObjectInput{
_, putErr := c.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: bucketName,
Key: aws.String(_peerDBCheck),
Body: reader,
Expand All @@ -109,7 +109,7 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta
return fmt.Errorf("failed to write to bucket: %w", putErr)
}

_, delErr := s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
_, delErr := c.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: bucketName,
Key: aws.String(_peerDBCheck),
})
Expand All @@ -118,8 +118,8 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta
}

// check if we can ping external metadata
if metadataDB != nil {
err := metadataDB.Ping(ctx)
if c.pgMetadata != nil {
err := c.pgMetadata.Ping(ctx)
if err != nil {
return fmt.Errorf("failed to ping external metadata: %w", err)
}
Expand All @@ -129,12 +129,6 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta
}

func (c *S3Connector) ConnectionActive(ctx context.Context) error {
validErr := ValidCheck(ctx, &c.client, c.url, c.pgMetadata)
if validErr != nil {
c.logger.Error("failed to validate s3 connector:", "error", validErr)
return validErr
}

return nil
}

Expand Down
16 changes: 6 additions & 10 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ type UnchangedToastColumnResult struct {
UnchangedToastColumns ArrayString
}

func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) error {
func (c *SnowflakeConnector) ValidateCheck(ctx context.Context) error {
schemaName := c.rawSchema
// check if schema exists
var schemaExists sql.NullBool
err := database.QueryRowContext(ctx, checkIfSchemaExistsSQL, schemaName).Scan(&schemaExists)
err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, schemaName).Scan(&schemaExists)
if err != nil {
return fmt.Errorf("error while checking if schema exists: %w", err)
}
Expand All @@ -116,9 +117,9 @@ func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) e

// In a transaction, create a table, insert a row into the table and then drop the table
// If any of these steps fail, the transaction will be rolled back
tx, err := database.BeginTx(ctx, nil)
tx, err := c.database.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
return fmt.Errorf("failed to begin transaction for table check: %w", err)
}
// in case we return after error, ensure transaction is rolled back
defer func() {
Expand Down Expand Up @@ -158,7 +159,7 @@ func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) e
// commit transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
return fmt.Errorf("failed to commit transaction for table check: %w", err)
}

return nil
Expand Down Expand Up @@ -212,11 +213,6 @@ func NewSnowflakeConnector(
rawSchema = *snowflakeProtoConfig.MetadataSchema
}

err = ValidationCheck(ctx, database, rawSchema)
if err != nil {
return nil, fmt.Errorf("could not validate snowflake peer: %w", err)
}

pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
return nil, fmt.Errorf("could not connect to metadata store: %w", err)
Expand Down

0 comments on commit 2b8a575

Please sign in to comment.