From 08f4ffd6f7da7bce6f5058a9b3e838f61a5fe6ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 14 Nov 2024 13:13:32 +0000 Subject: [PATCH] add dynamic config to adjust s3 part size (#2251) fixes #2184 where a user with a large enough batch hit ``` failed to sync records: failed to write records to S3: failed to upload file to path s3://peerdb-cache/...... 4890f21240e1.avro.zst: upload multipart failed, upload id: OTA0ZTE5NTMtMTdiMi00MWE5LWJhY....., cause: exceeded total allowed configured MaxUploadParts (10000). Adjust PartSize to fit in this limit ``` s3 sdk defaults to 5MiB part sizes, this user was able to fix their upload by changing that to 500MiB Co-authored-by: joltcan --- flow/connectors/clickhouse/cdc.go | 2 +- flow/connectors/clickhouse/qrep_avro_sync.go | 8 ++- flow/connectors/s3/qrep.go | 5 +- flow/connectors/snowflake/qrep_avro_sync.go | 67 ++++++++++---------- flow/connectors/snowflake/snowflake.go | 5 +- flow/connectors/utils/avro/avro_writer.go | 24 +++++-- flow/peerdbenv/dynamicconf.go | 12 ++++ 7 files changed, 76 insertions(+), 47 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 8fae9d6f26..d3eb883b46 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -88,7 +88,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( } avroSyncer := c.avroSyncMethod(req.FlowJobName) - numRecords, err := avroSyncer.SyncRecords(ctx, stream, req.FlowJobName, syncBatchID) + numRecords, err := avroSyncer.SyncRecords(ctx, req.Env, stream, req.FlowJobName, syncBatchID) if err != nil { return nil, err } diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index f8277e3aad..fa2cfe1034 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -60,6 +60,7 @@ func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a func (s *ClickHouseAvroSyncMethod) SyncRecords( ctx context.Context, + env map[string]string, stream *model.QRecordStream, flowJobName string, syncBatchID int64, @@ -76,7 +77,7 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( } batchIdentifierForFile := fmt.Sprintf("%s_%d", shared.RandomString(16), syncBatchID) - avroFile, err := s.writeToAvroFile(ctx, stream, avroSchema, batchIdentifierForFile, flowJobName) + avroFile, err := s.writeToAvroFile(ctx, env, stream, avroSchema, batchIdentifierForFile, flowJobName) if err != nil { return 0, err } @@ -110,7 +111,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( return 0, err } - avroFile, err := s.writeToAvroFile(ctx, stream, avroSchema, partition.PartitionId, config.FlowJobName) + avroFile, err := s.writeToAvroFile(ctx, config.Env, stream, avroSchema, partition.PartitionId, config.FlowJobName) if err != nil { return 0, err } @@ -176,6 +177,7 @@ func (s *ClickHouseAvroSyncMethod) getAvroSchema( func (s *ClickHouseAvroSyncMethod) writeToAvroFile( ctx context.Context, + env map[string]string, stream *model.QRecordStream, avroSchema *model.QRecordAvroSchemaDefinition, identifierForFile string, @@ -190,7 +192,7 @@ func (s *ClickHouseAvroSyncMethod) writeToAvroFile( s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, identifierForFile) s3AvroFileKey = strings.Trim(s3AvroFileKey, "/") - avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.credsProvider.Provider) + avroFile, err := ocfWriter.WriteRecordsToS3(ctx, env, s3o.Bucket, s3AvroFileKey, s.credsProvider.Provider) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) } diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 14c7b31ef2..9fbb485ab8 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -25,7 +25,7 @@ func (c *S3Connector) SyncQRepRecords( return 0, err } - numRecords, err := c.writeToAvroFile(ctx, stream, avroSchema, partition.PartitionId, config.FlowJobName) + numRecords, err := c.writeToAvroFile(ctx, config.Env, stream, avroSchema, partition.PartitionId, config.FlowJobName) if err != nil { return 0, err } @@ -47,6 +47,7 @@ func getAvroSchema( func (c *S3Connector) writeToAvroFile( ctx context.Context, + env map[string]string, stream *model.QRecordStream, avroSchema *model.QRecordAvroSchemaDefinition, partitionID string, @@ -60,7 +61,7 @@ func (c *S3Connector) writeToAvroFile( s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID) writer := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressNone, protos.DBType_SNOWFLAKE) - avroFile, err := writer.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, c.credentialsProvider) + avroFile, err := writer.WriteRecordsToS3(ctx, env, s3o.Bucket, s3AvroFileKey, c.credentialsProvider) if err != nil { return 0, fmt.Errorf("failed to write records to S3: %w", err) } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 2e37705c14..0fea54b027 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -20,8 +20,8 @@ import ( ) type SnowflakeAvroSyncHandler struct { - config *protos.QRepConfig - connector *SnowflakeConnector + *SnowflakeConnector + config *protos.QRepConfig } func NewSnowflakeAvroSyncHandler( @@ -29,13 +29,14 @@ func NewSnowflakeAvroSyncHandler( connector *SnowflakeConnector, ) *SnowflakeAvroSyncHandler { return &SnowflakeAvroSyncHandler{ - config: config, - connector: connector, + SnowflakeConnector: connector, + config: config, } } func (s *SnowflakeAvroSyncHandler) SyncRecords( ctx context.Context, + env map[string]string, dstTableSchema []*sql.ColumnType, stream *model.QRecordStream, flowJobName string, @@ -45,7 +46,7 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( schema := stream.Schema() - s.connector.logger.Info("sync function called and schema acquired", tableLog) + s.logger.Info("sync function called and schema acquired", tableLog) avroSchema, err := s.getAvroSchema(dstTableName, schema) if err != nil { @@ -53,32 +54,31 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( } partitionID := shared.RandomString(16) - avroFile, err := s.writeToAvroFile(ctx, stream, avroSchema, partitionID, flowJobName) + avroFile, err := s.writeToAvroFile(ctx, env, stream, avroSchema, partitionID, flowJobName) if err != nil { return 0, err } defer avroFile.Cleanup() - s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog) + s.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog) - stage := s.connector.getStageNameForJob(s.config.FlowJobName) - err = s.connector.createStage(ctx, stage, s.config) - if err != nil { + stage := s.getStageNameForJob(s.config.FlowJobName) + if err := s.createStage(ctx, stage, s.config); err != nil { return 0, err } - s.connector.logger.Info("Created stage " + stage) + s.logger.Info("Created stage " + stage) err = s.putFileToStage(ctx, avroFile, stage) if err != nil { return 0, err } - s.connector.logger.Info("pushed avro file to stage", tableLog) + s.logger.Info("pushed avro file to stage", tableLog) - writeHandler := NewSnowflakeAvroConsolidateHandler(s.connector, s.config, s.config.DestinationTableIdentifier, stage) + writeHandler := NewSnowflakeAvroConsolidateHandler(s.SnowflakeConnector, s.config, s.config.DestinationTableIdentifier, stage) err = writeHandler.CopyStageToDestination(ctx) if err != nil { return 0, err } - s.connector.logger.Info(fmt.Sprintf("copying records into %s from stage %s", + s.logger.Info(fmt.Sprintf("copying records into %s from stage %s", s.config.DestinationTableIdentifier, stage)) return avroFile.NumRecords, nil @@ -96,7 +96,7 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( dstTableName := config.DestinationTableIdentifier schema := stream.Schema() - s.connector.logger.Info("sync function called and schema acquired", partitionLog) + s.logger.Info("sync function called and schema acquired", partitionLog) err := s.addMissingColumns(ctx, schema, dstTableSchema, dstTableName, partition) if err != nil { @@ -108,22 +108,20 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( return 0, err } - avroFile, err := s.writeToAvroFile(ctx, stream, avroSchema, partition.PartitionId, config.FlowJobName) + avroFile, err := s.writeToAvroFile(ctx, config.Env, stream, avroSchema, partition.PartitionId, config.FlowJobName) if err != nil { return 0, err } defer avroFile.Cleanup() - stage := s.connector.getStageNameForJob(config.FlowJobName) + stage := s.getStageNameForJob(config.FlowJobName) - err = s.putFileToStage(ctx, avroFile, stage) - if err != nil { + if err := s.putFileToStage(ctx, avroFile, stage); err != nil { return 0, err } - s.connector.logger.Info("Put file to stage in Avro sync for snowflake", partitionLog) + s.logger.Info("Put file to stage in Avro sync for snowflake", partitionLog) - err = s.connector.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime) - if err != nil { + if err := s.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil { return 0, err } @@ -152,14 +150,14 @@ func (s *SnowflakeAvroSyncHandler) addMissingColumns( } if !hasColumn { - s.connector.logger.Info(fmt.Sprintf("adding column %s to destination table %s", + s.logger.Info(fmt.Sprintf("adding column %s to destination table %s", col.Name, dstTableName), partitionLog) colsToTypes[col.Name] = col.Type } } if len(colsToTypes) > 0 { - tx, err := s.connector.database.Begin() + tx, err := s.database.Begin() if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } @@ -173,7 +171,7 @@ func (s *SnowflakeAvroSyncHandler) addMissingColumns( alterTableCmd := fmt.Sprintf("ALTER TABLE %s ", dstTableName) alterTableCmd += fmt.Sprintf("ADD COLUMN IF NOT EXISTS \"%s\" %s;", upperCasedColName, sfColType) - s.connector.logger.Info(fmt.Sprintf("altering destination table %s with command `%s`", + s.logger.Info(fmt.Sprintf("altering destination table %s with command `%s`", dstTableName, alterTableCmd), partitionLog) if _, err := tx.ExecContext(ctx, alterTableCmd); err != nil { @@ -185,10 +183,10 @@ func (s *SnowflakeAvroSyncHandler) addMissingColumns( return fmt.Errorf("failed to commit transaction: %w", err) } - s.connector.logger.Info("successfully added missing columns to destination table "+ + s.logger.Info("successfully added missing columns to destination table "+ dstTableName, partitionLog) } else { - s.connector.logger.Info("no missing columns found in destination table "+dstTableName, partitionLog) + s.logger.Info("no missing columns found in destination table "+dstTableName, partitionLog) } return nil @@ -203,12 +201,13 @@ func (s *SnowflakeAvroSyncHandler) getAvroSchema( return nil, fmt.Errorf("failed to define Avro schema: %w", err) } - s.connector.logger.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema)) + s.logger.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema)) return avroSchema, nil } func (s *SnowflakeAvroSyncHandler) writeToAvroFile( ctx context.Context, + env map[string]string, stream *model.QRecordStream, avroSchema *model.QRecordAvroSchemaDefinition, partitionID string, @@ -223,7 +222,7 @@ func (s *SnowflakeAvroSyncHandler) writeToAvroFile( } localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID) - s.connector.logger.Info("writing records to local file " + localFilePath) + s.logger.Info("writing records to local file " + localFilePath) avroFile, err := ocfWriter.WriteRecordsToAvroFile(ctx, localFilePath) if err != nil { return nil, fmt.Errorf("failed to write records to Avro file: %w", err) @@ -238,14 +237,14 @@ func (s *SnowflakeAvroSyncHandler) writeToAvroFile( } s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, s.config.FlowJobName, partitionID) - s.connector.logger.Info("OCF: Writing records to S3", + s.logger.Info("OCF: Writing records to S3", slog.String(string(shared.PartitionIDKey), partitionID)) provider, err := utils.GetAWSCredentialsProvider(ctx, "snowflake", utils.PeerAWSCredentials{}) if err != nil { return nil, err } - avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, provider) + avroFile, err := ocfWriter.WriteRecordsToS3(ctx, env, s3o.Bucket, s3AvroFileKey, provider) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) } @@ -258,16 +257,16 @@ func (s *SnowflakeAvroSyncHandler) writeToAvroFile( func (s *SnowflakeAvroSyncHandler) putFileToStage(ctx context.Context, avroFile *avro.AvroFile, stage string) error { if avroFile.StorageLocation != avro.AvroLocalStorage { - s.connector.logger.Info("no file to put to stage") + s.logger.Info("no file to put to stage") return nil } putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage) - if _, err := s.connector.database.ExecContext(ctx, putCmd); err != nil { + if _, err := s.database.ExecContext(ctx, putCmd); err != nil { return fmt.Errorf("failed to put file to stage: %w", err) } - s.connector.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage)) + s.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage)) return nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 7a400d78a7..06e3fb881e 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -423,8 +423,7 @@ func (c *SnowflakeConnector) SyncRecords(ctx context.Context, req *model.SyncRec return nil, err } - err = c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID) - if err != nil { + if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID); err != nil { return nil, err } @@ -456,7 +455,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( return nil, err } - numRecords, err := avroSyncer.SyncRecords(ctx, destinationTableSchema, stream, req.FlowJobName) + numRecords, err := avroSyncer.SyncRecords(ctx, req.Env, destinationTableSchema, stream, req.FlowJobName) if err != nil { return nil, err } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 6f193be88b..ee72e2c28b 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -23,6 +23,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -187,7 +188,11 @@ func (p *peerDBOCFWriter) WriteOCF(ctx context.Context, w io.Writer) (int, error } func (p *peerDBOCFWriter) WriteRecordsToS3( - ctx context.Context, bucketName, key string, s3Creds utils.AWSCredentialsProvider, + ctx context.Context, + env map[string]string, + bucketName string, + key string, + s3Creds utils.AWSCredentialsProvider, ) (*AvroFile, error) { logger := shared.LoggerFromCtx(ctx) s3svc, err := utils.CreateS3Client(ctx, s3Creds) @@ -215,12 +220,23 @@ func (p *peerDBOCFWriter) WriteRecordsToS3( numRows, writeOcfError = p.WriteOCF(ctx, w) }() - _, err = manager.NewUploader(s3svc).Upload(ctx, &s3.PutObjectInput{ + partSize, err := peerdbenv.PeerDBS3PartSize(ctx, env) + if err != nil { + return nil, fmt.Errorf("could not get s3 part size config: %w", err) + } + + // Create the uploader using the AWS SDK v2 manager + uploader := manager.NewUploader(s3svc, func(u *manager.Uploader) { + if partSize > 0 { + u.PartSize = partSize + } + }) + + if _, err := uploader.Upload(ctx, &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: r, - }) - if err != nil { + }); err != nil { s3Path := "s3://" + bucketName + "/" + key logger.Error("failed to upload file", slog.Any("error", err), slog.String("s3_path", s3Path)) return nil, fmt.Errorf("failed to upload file: %w", err) diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 1e2f225906..4810faf0ae 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -116,6 +116,14 @@ DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); END;`, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, + { + Name: "PEERDB_S3_PART_SIZE", + Description: "S3 upload part size, may need to increase for large batches", + DefaultValue: "0", + ValueType: protos.DynconfValueType_INT, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_ALL, + }, { Name: "PEERDB_QUEUE_FORCE_TOPIC_CREATION", Description: "Force auto topic creation in mirrors, applies to Kafka and PubSub mirrors", @@ -340,6 +348,10 @@ func PeerDBClickHouseAWSS3BucketName(ctx context.Context, env map[string]string) return dynLookup(ctx, env, "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME") } +func PeerDBS3PartSize(ctx context.Context, env map[string]string) (int64, error) { + return dynamicConfSigned[int64](ctx, env, "PEERDB_S3_PART_SIZE") +} + // Kafka has topic auto create as an option, auto.create.topics.enable // But non-dedicated cluster maybe can't set config, may want peerdb to create topic. Similar for PubSub func PeerDBQueueForceTopicCreation(ctx context.Context, env map[string]string) (bool, error) {