diff --git a/.github/workflows/cleanup.yml b/.github/workflows/cleanup.yml index adf7e24039..5897eae7fd 100644 --- a/.github/workflows/cleanup.yml +++ b/.github/workflows/cleanup.yml @@ -54,20 +54,5 @@ jobs: run: go run main.go working-directory: ./e2e_cleanup env: - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - AWS_REGION: ${{ secrets.AWS_REGION }} TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json - TEST_S3_CREDS: ${{ github.workspace }}/s3_creds.json - TEST_GCS_CREDS: ${{ github.workspace }}/gcs_creds.json - AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }} - AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }} - AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }} - AZURE_SUBSCRIPTION_ID: ${{ secrets.AZURE_SUBSCRIPTION_ID }} - ENABLE_SQLSERVER_TESTS: true - SQLSERVER_HOST: ${{ secrets.SQLSERVER_HOST }} - SQLSERVER_PORT: ${{ secrets.SQLSERVER_PORT }} - SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }} - SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }} - SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }} diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index ab9366d487..2673bda3f8 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -31,6 +31,16 @@ jobs: discovery.type: single-node xpack.security.enabled: false xpack.security.enrollment.enabled: false + minio: + image: bitnami/minio:2024.11.7 + ports: + - 9999:9999 + env: + MINIO_ROOT_USER: minio + MINIO_ROOT_PASSWORD: miniosecret + MINIO_API_PORT_NUMBER: 9999 + AWS_EC2_METADATA_DISABLED: true + MINIO_DEFAULT_BUCKETS: peerdb steps: - uses: actions/checkout@v4 @@ -103,8 +113,20 @@ jobs: with: version: "latest" - - name: start clickhouse - uses: getsentry/action-clickhouse-in-ci@v1 + - uses: ubicloud/cache@v4 + id: cache-clickhouse + with: + path: ./clickhouse + key: ${{ runner.os }}-clickhouse + + - name: Install ClickHouse + if: steps.cache-clickhouse.outputs.cache-hit != 'true' + run: | + curl https://clickhouse.com | sh + + - name: Run ClickHouse + run: | + ./clickhouse server & - name: Install Temporal CLI uses: temporalio/setup-temporal@v0 @@ -119,9 +141,20 @@ jobs: go test -p 32 ./... -timeout 900s working-directory: ./flow env: - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - AWS_REGION: ${{ secrets.AWS_REGION }} + AWS_ENDPOINT_URL_S3: http://localhost:9999 + AWS_ACCESS_KEY_ID: minio + AWS_SECRET_ACCESS_KEY: miniosecret + AWS_REGION: us-east-1 + PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID: minio + PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY: miniosecret + PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION: us-east-1 + PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ENDPOINT_URL_S3: http://localhost:9999 + PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME: peerdb + PEERDB_SNOWFLAKE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID: minio + PEERDB_SNOWFLAKE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY: miniosecret + PEERDB_SNOWFLAKE_AWS_CREDENTIALS_AWS_REGION: us-east-1 + PEERDB_SNOWFLAKE_AWS_CREDENTIALS_AWS_ENDPOINT_URL_S3: http://localhost:9999 + PEERDB_SNOWFLAKE_AWS_S3_BUCKET_NAME: peerdb TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json TEST_S3_CREDS: ${{ github.workspace }}/s3_creds.json diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 4807bde8b8..7110819257 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -209,7 +209,7 @@ services: - flow-api minio: - image: minio/minio:RELEASE.2024-07-16T23-46-41Z + image: minio/minio:RELEASE.2024-11-07T00-52-20Z volumes: - minio-data:/data ports: diff --git a/docker-compose.yml b/docker-compose.yml index 393549f892..ce4a3994ad 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -184,7 +184,7 @@ services: - flow-api minio: - image: minio/minio:RELEASE.2024-07-16T23-46-41Z + image: minio/minio:RELEASE.2024-11-07T00-52-20Z restart: unless-stopped volumes: - minio-data:/data diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 46bec64fbc..4e89757014 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -33,7 +33,6 @@ type ClickHouseConnector struct { logger log.Logger config *protos.ClickhouseConfig credsProvider *utils.ClickHouseS3Credentials - s3Stage *ClickHouseS3Stage } func ValidateS3(ctx context.Context, creds *utils.ClickHouseS3Credentials) error { @@ -153,12 +152,10 @@ func NewClickHouseConnector( } awsBucketPath := config.S3Path - if awsBucketPath == "" { deploymentUID := peerdbenv.PeerDBDeploymentUID() flowName, _ := ctx.Value(shared.FlowNameKey).(string) - bucketPathSuffix := fmt.Sprintf("%s/%s", - url.PathEscape(deploymentUID), url.PathEscape(flowName)) + bucketPathSuffix := fmt.Sprintf("%s/%s", url.PathEscape(deploymentUID), url.PathEscape(flowName)) // Fallback: Get S3 credentials from environment awsBucketName, err := peerdbenv.PeerDBClickHouseAWSS3BucketName(ctx, env) if err != nil { @@ -170,10 +167,7 @@ func NewClickHouseConnector( awsBucketPath = fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix) } - clickHouseS3CredentialsNew := utils.ClickHouseS3Credentials{ - Provider: credentialsProvider, - BucketPath: awsBucketPath, - } + credentials, err := credentialsProvider.Retrieve(ctx) if err != nil { return nil, err @@ -184,8 +178,10 @@ func NewClickHouseConnector( PostgresMetadata: pgMetadata, config: config, logger: logger, - credsProvider: &clickHouseS3CredentialsNew, - s3Stage: NewClickHouseS3Stage(), + credsProvider: &utils.ClickHouseS3Credentials{ + Provider: credentialsProvider, + BucketPath: awsBucketPath, + }, } if credentials.AWS.SessionToken != "" { diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 4474c6118d..770abc7f20 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -493,7 +493,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error { avroSyncMethod := c.avroSyncMethod(flowJobName) - avroFile, err := c.s3Stage.GetAvroStage(ctx, flowJobName, syncBatchID) + avroFile, err := GetAvroStage(ctx, flowJobName, syncBatchID) if err != nil { return fmt.Errorf("failed to get avro stage: %w", err) } diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index edbd0392c9..f8277e3aad 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -18,8 +18,8 @@ import ( ) type ClickHouseAvroSyncMethod struct { - config *protos.QRepConfig - connector *ClickHouseConnector + *ClickHouseConnector + config *protos.QRepConfig } func NewClickHouseAvroSyncMethod( @@ -27,22 +27,22 @@ func NewClickHouseAvroSyncMethod( connector *ClickHouseConnector, ) *ClickHouseAvroSyncMethod { return &ClickHouseAvroSyncMethod{ - config: config, - connector: connector, + ClickHouseConnector: connector, + config: config, } } func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile *avro.AvroFile) error { - stagingPath := s.connector.credsProvider.BucketPath + stagingPath := s.credsProvider.BucketPath s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return err } - endpoint := s.connector.credsProvider.Provider.GetEndpointURL() - region := s.connector.credsProvider.Provider.GetRegion() + endpoint := s.credsProvider.Provider.GetEndpointURL() + region := s.credsProvider.Provider.GetRegion() avroFileUrl := utils.FileURLForS3Service(endpoint, region, s3o.Bucket, avroFile.FilePath) - creds, err := s.connector.credsProvider.Provider.Retrieve(ctx) + creds, err := s.credsProvider.Provider.Retrieve(ctx) if err != nil { return err } @@ -55,7 +55,7 @@ func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a s.config.DestinationTableIdentifier, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) - return s.connector.database.Exec(ctx, query) + return s.database.Exec(ctx, query) } func (s *ClickHouseAvroSyncMethod) SyncRecords( @@ -67,7 +67,7 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( dstTableName := s.config.DestinationTableIdentifier schema := stream.Schema() - s.connector.logger.Info("sync function called and schema acquired", + s.logger.Info("sync function called and schema acquired", slog.String("dstTable", dstTableName)) avroSchema, err := s.getAvroSchema(dstTableName, schema) @@ -81,14 +81,13 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( return 0, err } - s.connector.logger.Info("[SyncRecords] written records to Avro file", + s.logger.Info("[SyncRecords] written records to Avro file", slog.String("dstTable", dstTableName), slog.String("avroFile", avroFile.FilePath), slog.Int("numRecords", avroFile.NumRecords), slog.Int64("syncBatchID", syncBatchID)) - err = s.connector.s3Stage.SetAvroStage(ctx, flowJobName, syncBatchID, avroFile) - if err != nil { + if err := SetAvroStage(ctx, flowJobName, syncBatchID, avroFile); err != nil { return 0, fmt.Errorf("failed to set avro stage: %w", err) } @@ -103,7 +102,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { dstTableName := config.DestinationTableIdentifier - stagingPath := s.connector.credsProvider.BucketPath + stagingPath := s.credsProvider.BucketPath startTime := time.Now() avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema()) @@ -121,13 +120,13 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( return 0, err } - creds, err := s.connector.credsProvider.Provider.Retrieve(ctx) + creds, err := s.credsProvider.Provider.Retrieve(ctx) if err != nil { return 0, err } - endpoint := s.connector.credsProvider.Provider.GetEndpointURL() - region := s.connector.credsProvider.Provider.GetRegion() + endpoint := s.credsProvider.Provider.GetEndpointURL() + region := s.credsProvider.Provider.GetRegion() avroFileUrl := utils.FileURLForS3Service(endpoint, region, s3o.Bucket, avroFile.FilePath) selector := make([]string, 0, len(dstTableSchema)) for _, col := range dstTableSchema { @@ -151,13 +150,13 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) - if err := s.connector.database.Exec(ctx, query); err != nil { - s.connector.logger.Error("Failed to insert into select for ClickHouse", slog.Any("error", err)) + if err := s.database.Exec(ctx, query); err != nil { + s.logger.Error("Failed to insert into select for ClickHouse", slog.Any("error", err)) return 0, err } - if err := s.connector.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil { - s.connector.logger.Error("Failed to finish QRep partition", slog.Any("error", err)) + if err := s.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil { + s.logger.Error("Failed to finish QRep partition", slog.Any("error", err)) return 0, err } @@ -182,7 +181,7 @@ func (s *ClickHouseAvroSyncMethod) writeToAvroFile( identifierForFile string, flowJobName string, ) (*avro.AvroFile, error) { - stagingPath := s.connector.credsProvider.BucketPath + stagingPath := s.credsProvider.BucketPath ocfWriter := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressZstd, protos.DBType_CLICKHOUSE) s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { @@ -191,7 +190,7 @@ func (s *ClickHouseAvroSyncMethod) writeToAvroFile( s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, identifierForFile) s3AvroFileKey = strings.Trim(s3AvroFileKey, "/") - avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.connector.credsProvider.Provider) + avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.credsProvider.Provider) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) } diff --git a/flow/connectors/clickhouse/s3_stage.go b/flow/connectors/clickhouse/s3_stage.go index b4ca7d71c2..5f5eb899a4 100644 --- a/flow/connectors/clickhouse/s3_stage.go +++ b/flow/connectors/clickhouse/s3_stage.go @@ -6,19 +6,12 @@ import ( "fmt" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" utils "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/peerdbenv" ) -type ClickHouseS3Stage struct{} - -func NewClickHouseS3Stage() *ClickHouseS3Stage { - return &ClickHouseS3Stage{} -} - -func (c *ClickHouseS3Stage) SetAvroStage( +func SetAvroStage( ctx context.Context, flowJobName string, syncBatchID int64, @@ -29,36 +22,36 @@ func (c *ClickHouseS3Stage) SetAvroStage( return fmt.Errorf("failed to marshal avro file: %w", err) } - conn, err := c.getConn(ctx) + conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { return fmt.Errorf("failed to get connection: %w", err) } - _, err = conn.Exec(ctx, ` + if _, err := conn.Exec(ctx, ` INSERT INTO ch_s3_stage (flow_job_name, sync_batch_id, avro_file) VALUES ($1, $2, $3) ON CONFLICT (flow_job_name, sync_batch_id) - DO UPDATE SET avro_file = $3, created_at = CURRENT_TIMESTAMP - `, flowJobName, syncBatchID, avroFileJSON) - if err != nil { + DO UPDATE SET avro_file = $3, created_at = CURRENT_TIMESTAMP`, + flowJobName, syncBatchID, avroFileJSON, + ); err != nil { return fmt.Errorf("failed to set avro stage: %w", err) } return nil } -func (c *ClickHouseS3Stage) GetAvroStage(ctx context.Context, flowJobName string, syncBatchID int64) (*utils.AvroFile, error) { - conn, err := c.getConn(ctx) +func GetAvroStage(ctx context.Context, flowJobName string, syncBatchID int64) (*utils.AvroFile, error) { + conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { return nil, fmt.Errorf("failed to get connection: %w", err) } var avroFileJSON []byte - err = conn.QueryRow(ctx, ` + if err := conn.QueryRow(ctx, ` SELECT avro_file FROM ch_s3_stage - WHERE flow_job_name = $1 AND sync_batch_id = $2 - `, flowJobName, syncBatchID).Scan(&avroFileJSON) - if err != nil { + WHERE flow_job_name = $1 AND sync_batch_id = $2`, + flowJobName, syncBatchID, + ).Scan(&avroFileJSON); err != nil { if err == pgx.ErrNoRows { return nil, fmt.Errorf("no avro stage found for flow job %s and sync batch %d", flowJobName, syncBatchID) } @@ -72,12 +65,3 @@ func (c *ClickHouseS3Stage) GetAvroStage(ctx context.Context, flowJobName string return &avroFile, nil } - -func (c *ClickHouseS3Stage) getConn(ctx context.Context) (*pgxpool.Pool, error) { - conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - return nil, fmt.Errorf("unable to create catalog connection pool: %w", err) - } - - return conn, nil -} diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index 89fcab7b22..73eab604ad 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -126,12 +126,10 @@ func (s *StaticAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCredent } func (s *StaticAWSCredentialsProvider) GetEndpointURL() string { - endpoint := "" if s.credentials.EndpointUrl != nil { - endpoint = *s.credentials.EndpointUrl + return *s.credentials.EndpointUrl } - - return endpoint + return "" } func NewStaticAWSCredentialsProvider(credentials AWSCredentials, region string) AWSCredentialsProvider { @@ -209,12 +207,9 @@ func GetAWSCredentialsProvider(ctx context.Context, connectorName string, peerCr } func FileURLForS3Service(endpoint string, region string, bucket string, filePath string) string { - // example: min.io local bucket or GCS - matches := s3CompatibleServiceEndpointPattern.MatchString(endpoint) - if matches { + if s3CompatibleServiceEndpointPattern.MatchString(endpoint) { return fmt.Sprintf("%s/%s/%s", endpoint, bucket, filePath) } - return fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", bucket, region, filePath) } @@ -238,25 +233,17 @@ func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error) { } type resolverV2 struct { - userProvidedEndpointUrl string + url.URL } func (r *resolverV2) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) ( smithyendpoints.Endpoint, error, ) { - if r.userProvidedEndpointUrl != "" { - u, err := url.Parse(r.userProvidedEndpointUrl) - if err != nil { - return smithyendpoints.Endpoint{}, err - } - - u.Path += "/" + *params.Bucket - return smithyendpoints.Endpoint{ - URI: *u, - }, nil - } - - return s3.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params) + uri := r.URL + uri.Path += "/" + *params.Bucket + return smithyendpoints.Endpoint{ + URI: uri, + }, nil } func CreateS3Client(ctx context.Context, credsProvider AWSCredentialsProvider) (*s3.Client, error) { @@ -265,28 +252,35 @@ func CreateS3Client(ctx context.Context, credsProvider AWSCredentialsProvider) ( return nil, err } - s3Client := s3.NewFromConfig(aws.Config{}, func(options *s3.Options) { - options.Region = credsProvider.GetRegion() - options.Credentials = credsProvider.GetUnderlyingProvider() - - if awsCredentials.EndpointUrl != nil && *awsCredentials.EndpointUrl != "" { - options.BaseEndpoint = awsCredentials.EndpointUrl - options.EndpointResolverV2 = &resolverV2{ - userProvidedEndpointUrl: *awsCredentials.EndpointUrl, - } + options := s3.Options{ + Region: credsProvider.GetRegion(), + Credentials: credsProvider.GetUnderlyingProvider(), + } + if awsCredentials.EndpointUrl != nil && *awsCredentials.EndpointUrl != "" { + options.BaseEndpoint = awsCredentials.EndpointUrl + options.UsePathStyle = true + url, err := url.Parse(*awsCredentials.EndpointUrl) + if err != nil { + return nil, err + } + options.EndpointResolverV2 = &resolverV2{ + URL: *url, + } + if strings.Contains(*awsCredentials.EndpointUrl, "storage.googleapis.com") { // Assign custom client with our own transport options.HTTPClient = &http.Client{ Transport: &RecalculateV4Signature{ next: http.DefaultTransport, signer: v4.NewSigner(), credentials: credsProvider.GetUnderlyingProvider(), - region: credsProvider.GetRegion(), + region: options.Region, }, } } - }) - return s3Client, nil + } + + return s3.New(options), nil } // RecalculateV4Signature allow GCS over S3, removing Accept-Encoding header from sign @@ -314,8 +308,7 @@ func (lt *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response, if err != nil { return nil, err } - err = lt.signer.SignHTTP(req.Context(), creds, req, v4.GetPayloadHash(req.Context()), "s3", lt.region, timeDate) - if err != nil { + if err := lt.signer.SignHTTP(req.Context(), creds, req, v4.GetPayloadHash(req.Context()), "s3", lt.region, timeDate); err != nil { return nil, err } // Reset Accept-Encoding if desired @@ -331,21 +324,20 @@ func PutAndRemoveS3(ctx context.Context, client *s3.Client, bucket string, prefi reader := strings.NewReader(time.Now().Format(time.RFC3339)) bucketName := aws.String(bucket) temporaryObjectPath := prefix + "/" + _peerDBCheck + uuid.New().String() - temporaryObjectPath = strings.TrimPrefix(temporaryObjectPath, "/") - _, putErr := client.PutObject(ctx, &s3.PutObjectInput{ + key := aws.String(strings.TrimPrefix(temporaryObjectPath, "/")) + + if _, putErr := client.PutObject(ctx, &s3.PutObjectInput{ Bucket: bucketName, - Key: aws.String(temporaryObjectPath), + Key: key, Body: reader, - }) - if putErr != nil { + }); putErr != nil { return fmt.Errorf("failed to write to bucket: %w", putErr) } - _, delErr := client.DeleteObject(ctx, &s3.DeleteObjectInput{ + if _, delErr := client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: bucketName, - Key: aws.String(temporaryObjectPath), - }) - if delErr != nil { + Key: key, + }); delErr != nil { return fmt.Errorf("failed to delete from bucket: %w", delErr) } diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index e1eafd6b4b..79ff2aa7bb 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -56,6 +56,11 @@ func (s ClickHouseSuite) Peer() *protos.Peer { } func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer { + region := "" + if s.s3Helper.S3Config.Region != nil { + region = *s.s3Helper.S3Config.Region + } + ret := &protos.Peer{ Name: e2e.AddSuffix(s, dbname), Type: protos.DBType_CLICKHOUSE, @@ -67,7 +72,7 @@ func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer { S3Path: s.s3Helper.BucketName, AccessKeyId: *s.s3Helper.S3Config.AccessKeyId, SecretAccessKey: *s.s3Helper.S3Config.SecretAccessKey, - Region: *s.s3Helper.S3Config.Region, + Region: region, DisableTls: true, Endpoint: s.s3Helper.S3Config.Endpoint, }, @@ -188,7 +193,7 @@ func SetupSuite(t *testing.T) ClickHouseSuite { conn, err := e2e.SetupPostgres(t, suffix) require.NoError(t, err, "failed to setup postgres") - s3Helper, err := e2e_s3.NewS3TestHelper(false) + s3Helper, err := e2e_s3.NewS3TestHelper(e2e_s3.Minio) require.NoError(t, err, "failed to setup S3") s := ClickHouseSuite{ diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 8148715148..c52fca7a38 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -50,6 +50,7 @@ func (s PeerFlowE2ETestSuiteS3) Peer() *protos.Peer { } func TestPeerFlowE2ETestSuiteS3(t *testing.T) { + t.Skip("skipping AWS, CI credentials revoked") // TODO fix CI e2eshared.RunSuite(t, SetupSuiteS3) } @@ -57,14 +58,16 @@ func TestPeerFlowE2ETestSuiteGCS(t *testing.T) { e2eshared.RunSuite(t, SetupSuiteGCS) } +func TestPeerFlowE2ETestSuiteMinIO(t *testing.T) { + e2eshared.RunSuite(t, SetupSuiteMinIO) +} + func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, tableName) - require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.conn.Conn(), s.suffix, tableName, rowCount) - require.NoError(s.t, err) + require.NoError(s.t, e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, tableName)) + require.NoError(s.t, e2e.PopulateSourceTable(s.conn.Conn(), s.suffix, tableName, rowCount)) } -func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { +func setupSuite(t *testing.T, s3environment S3Environment) PeerFlowE2ETestSuiteS3 { t.Helper() suffix := "s3_" + strings.ToLower(shared.RandomString(8)) @@ -73,7 +76,7 @@ func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { require.Fail(t, "failed to setup postgres", err) } - helper, err := NewS3TestHelper(gcs) + helper, err := NewS3TestHelper(s3environment) if err != nil { require.Fail(t, "failed to setup S3", err) } @@ -97,12 +100,17 @@ func (s PeerFlowE2ETestSuiteS3) Teardown() { func SetupSuiteS3(t *testing.T) PeerFlowE2ETestSuiteS3 { t.Helper() - return setupSuite(t, false) + return setupSuite(t, Aws) } func SetupSuiteGCS(t *testing.T) PeerFlowE2ETestSuiteS3 { t.Helper() - return setupSuite(t, true) + return setupSuite(t, Gcs) +} + +func SetupSuiteMinIO(t *testing.T) PeerFlowE2ETestSuiteS3 { + t.Helper() + return setupSuite(t, Minio) } func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 20ac3e9039..af6be64f94 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -24,28 +24,48 @@ type S3TestHelper struct { prefix string } -func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { - credsPath := os.Getenv("TEST_S3_CREDS") - bucketName := "peerdb-test-bucket" - if switchToGCS { +type S3Environment int + +const ( + Aws S3Environment = iota + Gcs + Minio +) + +func NewS3TestHelper(s3environment S3Environment) (*S3TestHelper, error) { + var config utils.S3PeerCredentials + var endpoint string + var credsPath string + var bucketName string + switch s3environment { + case Aws: + credsPath = os.Getenv("TEST_S3_CREDS") + bucketName = "peerdb-test-bucket" + case Gcs: credsPath = os.Getenv("TEST_GCS_CREDS") bucketName = "peerdb_staging" + endpoint = "https://storage.googleapis.com" + case Minio: + bucketName = "peerdb" + endpoint = os.Getenv("AWS_ENDPOINT_URL_S3") + config.AccessKeyID = os.Getenv("AWS_ACCESS_KEY_ID") + config.SecretAccessKey = os.Getenv("AWS_SECRET_ACCESS_KEY") + config.Region = os.Getenv("AWS_REGION") + default: + panic(fmt.Sprintf("invalid s3environment %d", s3environment)) } - content, err := e2eshared.ReadFileToBytes(credsPath) - if err != nil { - return nil, fmt.Errorf("failed to read file: %w", err) - } + if credsPath != "" { + content, err := e2eshared.ReadFileToBytes(credsPath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } - var config utils.S3PeerCredentials - err = json.Unmarshal(content, &config) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal json: %w", err) - } - endpoint := "" - if switchToGCS { - endpoint = "https://storage.googleapis.com" + if err := json.Unmarshal(content, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal json: %w", err) + } } + var endpointUrlPtr *string if endpoint != "" { endpointUrlPtr = &endpoint @@ -62,6 +82,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { if err != nil { return nil, err } + prefix := fmt.Sprintf("peerdb_test/%d_%s", time.Now().Unix(), shared.RandomString(6)) return &S3TestHelper{ client, @@ -106,13 +127,10 @@ func (h *S3TestHelper) CleanUp(ctx context.Context) error { // Delete each object for _, obj := range files.Contents { - deleteInput := &s3.DeleteObjectInput{ + if _, err := h.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: &h.BucketName, Key: obj.Key, - } - - _, err := h.client.DeleteObject(ctx, deleteInput) - if err != nil { + }); err != nil { return err } } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 7fed8ada0a..0f86ce7670 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -123,6 +123,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { + s.t.Skip("aws s3 broken in ci") // TODO fix tc := e2e.NewTemporalClient(s.t) numRows := 10 @@ -199,6 +200,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { + s.t.Skip("aws s3 broken in ci") // TODO fix tc := e2e.NewTemporalClient(s.t) numRows := 10