Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jul 3, 2023
2 parents c425923 + 69f2a08 commit 4d146ad
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 157 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Real-time syncing of data from source to target based on change-feed or CDC (log
| --- | --- | --- | --- |
| CDC | PostgreSQL | BigQuery | Beta |
| CDC | PostgreSQL | Snowflake | Beta |
| CDC | PostgreSQL | Kafka | Beta |
| Initial Load | PostgreSQL | BigQuery | Coming Soon! |
| Initial Load | PostgreSQL | Snowflake | Coming Soon! |

Expand All @@ -95,8 +96,9 @@ Continuous syncing of data from source to target based on any SELECT query on th

| Source | Target | Status |
| --- | --- | --- |
| PostgreSQL | BigQuery | Under development |
| PostgreSQL | Snowflake | Under development |
| PostgreSQL | BigQuery | Beta |
| PostgreSQL | Snowflake | Beta |
| PostgreSQL | S3 | Under development |

## License

Expand Down
15 changes: 4 additions & 11 deletions flow/connectors/snowflake/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/linkedin/goavro/v2"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -76,20 +75,14 @@ func WriteRecordsToS3(
}
}()

awsSecrets, err := utils.GetAWSSecrets()
s3svc, err := utils.CreateS3Client()
if err != nil {
log.Errorf("failed to get AWS secrets: %v", err)
return fmt.Errorf("failed to get AWS secrets: %w", err)
log.Errorf("failed to create S3 client: %v", err)
return fmt.Errorf("failed to create S3 client: %w", err)
}

// Initialize a session that the SDK will use to load
// credentials from the shared credentials file. (~/.aws/credentials).
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(awsSecrets.Region),
}))

// Create an uploader with the session and default options
uploader := s3manager.NewUploader(sess)
uploader := s3manager.NewUploaderWithClient(s3svc)

// Upload the file to S3.
result, err := uploader.Upload(&s3manager.UploadInput{
Expand Down
112 changes: 69 additions & 43 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -131,55 +130,52 @@ func (c *SnowflakeConnector) isPartitionSynced(partitionID string) (bool, error)
}

func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
err := c.createQRepMetadataTable()
if err != nil {
return err
}

stageName := c.getStageNameForJob(config.FlowJobName)

err = c.createStage(stageName, config)
if err != nil {
return err
}

return nil
}

func (c *SnowflakeConnector) createQRepMetadataTable() error {
// Define the schema
schemaStatement := `
CREATE TABLE IF NOT EXISTS %s.%s (
CREATE TABLE IF NOT EXISTS %s.%s (
flowJobName STRING,
partitionID STRING,
syncPartition STRING,
syncStartTime TIMESTAMP_LTZ,
syncFinishTime TIMESTAMP_LTZ
);
);
`
queryString := fmt.Sprintf(schemaStatement, "public", qRepMetadataTableName)

// Execute the query
_, err := c.database.Exec(queryString)
if err != nil {
log.Errorf("failed to create table %s.%s: %v", "public", qRepMetadataTableName, err)
return fmt.Errorf("failed to create table %s.%s: %w", "public", qRepMetadataTableName, err)
}

log.Infof("Created table %s", qRepMetadataTableName)
return nil
}

stageName := c.getStageNameForJob(config.FlowJobName)

func (c *SnowflakeConnector) createStage(stageName string, config *protos.QRepConfig) error {
var createStageStmt string
// if config staging path starts with S3 we need to create an external stage.
if strings.HasPrefix(config.StagingPath, "s3://") {
awsCreds, err := utils.GetAWSSecrets()
stmt, err := c.createExternalStage(stageName, config)
if err != nil {
log.Errorf("failed to get AWS secrets: %v", err)
return fmt.Errorf("failed to get AWS secrets: %w", err)
return err
}

credsStr := fmt.Sprintf("CREDENTIALS=(AWS_KEY_ID='%s' AWS_SECRET_KEY='%s')",
awsCreds.AccessKeyID, awsCreds.SecretAccessKey)

s3o, err := utils.NewS3BucketAndPrefix(config.StagingPath)
if err != nil {
log.Errorf("failed to create S3 bucket and prefix: %v", err)
return fmt.Errorf("failed to create S3 bucket and prefix: %w", err)
}

cleanURL := fmt.Sprintf("s3://%s/%s/%s", s3o.Bucket, s3o.Prefix, config.FlowJobName)

stageStatement := `
CREATE OR REPLACE STAGE %s
URL = '%s'
%s
FILE_FORMAT = (TYPE = AVRO);
`
createStageStmt = fmt.Sprintf(stageStatement, stageName, cleanURL, credsStr)
createStageStmt = stmt
} else {
stageStatement := `
CREATE OR REPLACE STAGE %s
Expand All @@ -189,7 +185,7 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
}

// Execute the query
_, err = c.database.Exec(createStageStmt)
_, err := c.database.Exec(createStageStmt)
if err != nil {
log.Errorf("failed to create stage %s: %v", stageName, err)
return fmt.Errorf("failed to create stage %s: %w", stageName, err)
Expand All @@ -199,6 +195,42 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
return nil
}

func (c *SnowflakeConnector) createExternalStage(stageName string, config *protos.QRepConfig) (string, error) {
awsCreds, err := utils.GetAWSSecrets()
if err != nil {
log.Errorf("failed to get AWS secrets: %v", err)
return "", fmt.Errorf("failed to get AWS secrets: %w", err)
}

s3o, err := utils.NewS3BucketAndPrefix(config.StagingPath)
if err != nil {
log.Errorf("failed to extract S3 bucket and prefix: %v", err)
return "", fmt.Errorf("failed to extract S3 bucket and prefix: %w", err)
}

cleanURL := fmt.Sprintf("s3://%s/%s/%s", s3o.Bucket, s3o.Prefix, config.FlowJobName)

s3Int := config.DestinationPeer.GetSnowflakeConfig().S3Integration
if s3Int == "" {
credsStr := fmt.Sprintf("CREDENTIALS=(AWS_KEY_ID='%s' AWS_SECRET_KEY='%s')",
awsCreds.AccessKeyID, awsCreds.SecretAccessKey)

stageStatement := `
CREATE OR REPLACE STAGE %s
URL = '%s'
%s
FILE_FORMAT = (TYPE = AVRO);`
return fmt.Sprintf(stageStatement, stageName, cleanURL, credsStr), nil
} else {
stageStatement := `
CREATE OR REPLACE STAGE %s
URL = '%s'
STORAGE_INTEGRATION = %s
FILE_FORMAT = (TYPE = AVRO);`
return fmt.Sprintf(stageStatement, stageName, cleanURL, s3Int), nil
}
}

func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig) error {
log.Infof("Consolidating partitions for flow job %s", config.FlowJobName)

Expand Down Expand Up @@ -282,12 +314,6 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error {

// if s3 we need to delete the contents of the bucket
if strings.HasPrefix(stagingPath, "s3://") {
awsCreds, err := utils.GetAWSSecrets()
if err != nil {
log.Errorf("failed to get AWS secrets: %v", err)
return fmt.Errorf("failed to get AWS secrets: %w", err)
}

s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
log.Errorf("failed to create S3 bucket and prefix: %v", err)
Expand All @@ -297,20 +323,20 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error {
log.Infof("Deleting contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job)

// deleting the contents of the bucket with prefix
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(awsCreds.Region),
}))

s3Svc := s3.New(sess)
s3Client := s3manager.NewBatchDelete(sess)
s3svc, err := utils.CreateS3Client()
if err != nil {
log.Errorf("failed to create S3 client: %v", err)
return fmt.Errorf("failed to create S3 client: %w", err)
}

// Create a list of all objects with the defined prefix in the bucket
iter := s3manager.NewDeleteListIterator(s3Svc, &s3.ListObjectsInput{
iter := s3manager.NewDeleteListIterator(s3svc, &s3.ListObjectsInput{
Bucket: aws.String(s3o.Bucket),
Prefix: aws.String(fmt.Sprintf("%s/%s", s3o.Prefix, job)),
})

// Iterate through the objects in the bucket with the prefix and delete them
s3Client := s3manager.NewBatchDeleteWithClient(s3svc)
if err := s3Client.Delete(aws.BackgroundContext(), iter); err != nil {
log.Errorf("failed to delete objects from bucket: %v", err)
return fmt.Errorf("failed to delete objects from bucket: %w", err)
Expand Down
37 changes: 28 additions & 9 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,38 @@ import (
"fmt"
"os"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)

type AWSSecrets struct {
AccessKeyID string
SecretAccessKey string
AwsRoleArn string
Region string
}

func GetAWSSecrets() (*AWSSecrets, error) {
awsKey := os.Getenv("AWS_ACCESS_KEY_ID")
if awsKey == "" {
return nil, fmt.Errorf("AWS_ACCESS_KEY_ID must be set")
awsRegion := os.Getenv("AWS_REGION")
if awsRegion == "" {
return nil, fmt.Errorf("AWS_REGION must be set")
}

awsKey := os.Getenv("AWS_ACCESS_KEY_ID")
awsSecret := os.Getenv("AWS_SECRET_ACCESS_KEY")
if awsSecret == "" {
return nil, fmt.Errorf("AWS_SECRET_ACCESS_KEY must be set")
}
awsRoleArn := os.Getenv("AWS_ROLE_ARN")

awsRegion := os.Getenv("AWS_REGION")
if awsRegion == "" {
return nil, fmt.Errorf("AWS_REGION must be set")
// one of (awsKey and awsSecret) or awsRoleArn must be set
if awsKey == "" && awsSecret == "" && awsRoleArn == "" {
return nil, fmt.Errorf("one of (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) or AWS_ROLE_ARN must be set")
}

return &AWSSecrets{
AccessKeyID: awsKey,
SecretAccessKey: awsSecret,
AwsRoleArn: awsRoleArn,
Region: awsRegion,
}, nil
}
Expand Down Expand Up @@ -60,3 +65,17 @@ func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error) {
Prefix: prefix,
}, nil
}

func CreateS3Client() (*s3.S3, error) {
awsSecrets, err := GetAWSSecrets()
if err != nil {
return nil, fmt.Errorf("failed to get AWS secrets: %w", err)
}

sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(awsSecrets.Region),
}))

s3svc := s3.New(sess)
return s3svc, nil
}
42 changes: 42 additions & 0 deletions flow/e2e/qrep_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,48 @@ func (s *E2EPeerFlowTestSuite) Test_Complete_QRep_Flow_Avro_SF_S3() {
env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() {
env := s.NewTestWorkflowEnvironment()
registerWorkflowsAndActivities(env)

numRows := 10

tblName := "test_qrep_flow_avro_sf_s3_int"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", tblName)

sfPeer := s.sfHelper.Peer
sfPeer.GetSnowflakeConfig().S3Integration = "peerdb_s3_integration"

qrepConfig := s.createQRepWorkflowConfig(
"test_qrep_flow_avro_sf_int",
"e2e_test."+tblName,
dstSchemaQualified,
query,
protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO,
sfPeer,
)
qrepConfig.StagingPath = "s3://peerdb-test-bucket/avro"

runQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err := env.GetWorkflowError()
s.NoError(err)

sel := getOwnersSelectorString()
s.compareTableContentsSF(tblName, sel)

env.AssertExpectations(s.T())
}

func runQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) {
lastPartition := &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Expand Down
Loading

0 comments on commit 4d146ad

Please sign in to comment.