diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 8a8181ba61..cbd2c8afa5 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -19,10 +19,10 @@ x-flow-worker-env: &flow-worker-env # For GCS, these will be your HMAC keys instead # For more information: # https://cloud.google.com/storage/docs/authentication/managing-hmackeys - AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-} - AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-AKIASB7EBZDCEVIMB4XH} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-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} # For GCS, set this to "auto" without the quotes - AWS_REGION: ${AWS_REGION:-} + AWS_REGION: ${AWS_REGION:-us-east-2} # For GCS, set this as: https://storage.googleapis.com AWS_ENDPOINT: ${AWS_ENDPOINT:-} # enables worker profiling using Grafana Pyroscope @@ -178,41 +178,40 @@ services: temporal-admin-tools: condition: service_healthy - peerdb: - container_name: peerdb-server - stop_signal: SIGINT - build: - context: . - dockerfile: stacks/peerdb-server.Dockerfile - environment: - <<: *catalog-config - PEERDB_LOG_DIR: /var/log/peerdb - PEERDB_PASSWORD: peerdb - PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112 - RUST_LOG: info - RUST_BACKTRACE: 1 - ports: - - 9900:9900 - depends_on: - catalog: - condition: service_healthy - - peerdb-ui: - container_name: peerdb-ui - build: - context: . - dockerfile: stacks/peerdb-ui.Dockerfile - ports: - - 3000:3000 - environment: - <<: *catalog-config - DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres - PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113 - PEERDB_PASSWORD: - NEXTAUTH_SECRET: __changeme__ - NEXTAUTH_URL: http://localhost:3000 - depends_on: - - flow-api + # peerdb: + # container_name: peerdb-server + # stop_signal: SIGINT + # build: + # context: . + # dockerfile: stacks/peerdb-server.Dockerfile + # environment: + # <<: *catalog-config + # PEERDB_LOG_DIR: /var/log/peerdb + # PEERDB_PASSWORD: peerdb + # PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112 + # RUST_LOG: info + # RUST_BACKTRACE: 1 + # ports: + # - 9900:9900 + # depends_on: + # catalog: + # condition: service_healthy + # peerdb-ui: + # container_name: peerdb-ui + # build: + # context: . + # dockerfile: stacks/peerdb-ui.Dockerfile + # ports: + # - 3000:3000 + # environment: + # <<: *catalog-config + # DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres + # PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113 + # PEERDB_PASSWORD: + # NEXTAUTH_SECRET: __changeme__ + # NEXTAUTH_URL: http://localhost:3000 + # depends_on: + # - flow-api volumes: pgdata: diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index a6224da626..a55adba96a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -371,6 +371,7 @@ func (a *FlowableActivity) StartNormalize( ) (*model.NormalizeResponse, error) { conn := input.FlowConnectionConfigs ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName) + fmt.Printf("\n*********************** in StartNormalize %+v\n", conn) dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination) if errors.Is(err, connectors.ErrUnsupportedFunctionality) { dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 1cee1fd5fc..e4541a61ae 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -120,6 +120,7 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context, func (h *FlowRequestHandler) CreateCDCFlow( ctx context.Context, req *protos.CreateCDCFlowRequest, ) (*protos.CreateCDCFlowResponse, error) { + fmt.Printf("\n******************************** CreateCDCFlow") cfg := req.ConnectionConfigs _, validateErr := h.ValidateCDCMirror(ctx, req) if validateErr != nil { @@ -230,6 +231,7 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog( func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { + fmt.Printf("\n******************************** CreateQRepFlow") cfg := req.QrepConfig workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 0e9b6a3d5f..28dfe100c2 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -12,7 +12,6 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5/pgtype" ) @@ -75,15 +74,15 @@ func (c *ClickhouseConnector) CreateRawTable(req *protos.CreateRawTableInput) (* // } createRawTableSQL := `CREATE TABLE IF NOT EXISTS %s ( - _PEERDB_UID STRING NOT NULL, - _PEERDB_TIMESTAMP INT NOT NULL, - _PEERDB_DESTINATION_TABLE_NAME STRING NOT NULL, - _PEERDB_DATA STRING NOT NULL, - _PEERDB_RECORD_TYPE INTEGER NOT NULL, - _PEERDB_MATCH_DATA STRING, - _PEERDB_BATCH_ID INT, - _PEERDB_UNCHANGED_TOAST_COLUMNS STRING - ) ENGINE = ReplacingMergeTree ORDER BY _PEERDB_UID;` + _peerdb_uid String NOT NULL, + _peerdb_timestamp Int64 NOT NULL, + _peerdb_destination_table_name String NOT NULL, + _peerdb_data String NOT NULL, + _peerdb_record_type Int NOT NULL, + _peerdb_match_data String, + _peerdb_batch_id Int, + _peerdb_unchanged_toast_columns String + ) ENGINE = ReplacingMergeTree ORDER BY _peerdb_uid;` _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(createRawTableSQL, rawTableName)) @@ -95,11 +94,11 @@ func (c *ClickhouseConnector) CreateRawTable(req *protos.CreateRawTableInput) (* // return nil, fmt.Errorf("unable to commit transaction for creation of raw table: %w", err) // } - stage := c.getStageNameForJob(req.FlowJobName) - err = c.createStage(stage, &protos.QRepConfig{}) - if err != nil { - return nil, err - } + // stage := c.getStageNameForJob(req.FlowJobName) + // err = c.createStage(stage, &protos.QRepConfig{}) + // if err != nil { + // return nil, err + // } return &protos.CreateRawTableOutput{ TableIdentifier: rawTableName, @@ -114,12 +113,15 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( tableNameRowsMapping := make(map[string]uint32) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) streamRes, err := utils.RecordsToRawTableStream(streamReq) + //x := *&streamRes.Stream + //y := (*x).Records + fmt.Printf("\n*******************############################## cdc.go in syncRecordsViaAvro streamRes: %+v", streamRes) if err != nil { return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } qrepConfig := &protos.QRepConfig{ - StagingPath: "", + StagingPath: c.config.S3Integration, FlowJobName: req.FlowJobName, DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s", rawTableIdentifier)), @@ -157,8 +159,11 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( } func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - rawTableName := getRawTableName(req.FlowJobName) - c.logger.Info(fmt.Sprintf("pushing records to Snowflake table %s", rawTableName)) + fmt.Printf("\n ******************************************** !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! in ClickhouseConnector.SyncRecords") + fmt.Printf("\n ******************************* in cdc.go in SyncRecords config: %+v", c.config.S3Integration) + //c.config.S3Integration = "s3://avro-clickhouse" + rawTableName := c.getRawTableName(req.FlowJobName) + c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName)) syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) if err != nil { @@ -186,20 +191,36 @@ func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model // }() // updating metadata with new offset and syncBatchID - err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckPointID, syncBatchID, syncRecordsTx) + // err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckPointID, syncBatchID, syncRecordsTx) + // if err != nil { + // return nil, err + // } + // transaction commits + // err = syncRecordsTx.Commit() + // if err != nil { + // return nil, err + // } + + lastCheckpoint, err := req.Records.GetLastCheckpoint() if err != nil { + return nil, fmt.Errorf("failed to get last checkpoint: %w", err) + } + + err = c.SetLastOffset(req.FlowJobName, lastCheckpoint) + if err != nil { + c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err)) return nil, err } - // transaction commits - err = syncRecordsTx.Commit() + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { + c.logger.Error("failed to increment id", slog.Any("error", err)) return nil, err } return res, nil } -func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { +func (c *ClickhouseConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { checkIfJobMetadataExistsSQL := "SELECT TO_BOOLEAN(COUNT(1)) FROM %s WHERE MIRROR_JOB_NAME=?" var result pgtype.Bool @@ -211,75 +232,78 @@ func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bo return result.Bool, nil } -func (c *ClickhouseConnector) updateSyncMetadata(flowJobName string, lastCP int64, - syncBatchID int64, syncRecordsTx *sql.Tx, -) error { - jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) - if err != nil { - return fmt.Errorf("failed to get sync status for flow job: %w", err) - } - - if !jobMetadataExists { - _, err := syncRecordsTx.ExecContext(c.ctx, - fmt.Sprintf(insertJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), - flowJobName, lastCP, syncBatchID, 0) - if err != nil { - return fmt.Errorf("failed to insert flow job status: %w", err) - } - } else { - _, err := syncRecordsTx.ExecContext(c.ctx, - fmt.Sprintf(updateMetadataForSyncRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier), - lastCP, syncBatchID, flowJobName) - if err != nil { - return fmt.Errorf("failed to update flow job status: %w", err) - } - } - - return nil -} +// func (c *ClickhouseConnector) updateSyncMetadata(flowJobName string, lastCP int64, +// syncBatchID int64, syncRecordsTx *sql.Tx, +// ) error { +// jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) +// if err != nil { +// return fmt.Errorf("failed to get sync status for flow job: %w", err) +// } + +// if !jobMetadataExists { +// _, err := syncRecordsTx.ExecContext(c.ctx, +// fmt.Sprintf(insertJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), +// flowJobName, lastCP, syncBatchID, 0) +// if err != nil { +// return fmt.Errorf("failed to insert flow job status: %w", err) +// } +// } else { +// _, err := syncRecordsTx.ExecContext(c.ctx, +// fmt.Sprintf(updateMetadataForSyncRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier), +// lastCP, syncBatchID, flowJobName) +// if err != nil { +// return fmt.Errorf("failed to update flow job status: %w", err) +// } +// } + +// return nil +// } func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error { - syncFlowCleanupTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return fmt.Errorf("unable to begin transaction for sync flow cleanup: %w", err) - } - defer func() { - deferErr := syncFlowCleanupTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error while rolling back transaction for flow cleanup", slog.Any("error", deferErr)) - } - }() - - row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) - var schemaExists pgtype.Bool - err = row.Scan(&schemaExists) - if err != nil { - return fmt.Errorf("unable to check if internal schema exists: %w", err) - } + // syncFlowCleanupTx, err := c.database.BeginTx(c.ctx, nil) + // if err != nil { + // return fmt.Errorf("unable to begin transaction for sync flow cleanup: %w", err) + // } + // defer func() { + // deferErr := syncFlowCleanupTx.Rollback() + // if deferErr != sql.ErrTxDone && deferErr != nil { + // c.logger.Error("error while rolling back transaction for flow cleanup", slog.Any("error", deferErr)) + // } + // }() - if schemaExists.Bool { - _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, c.metadataSchema, - getRawTableIdentifier(jobName))) - if err != nil { - return fmt.Errorf("unable to drop raw table: %w", err) - } - _, err = syncFlowCleanupTx.ExecContext(c.ctx, - fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) - if err != nil { - return fmt.Errorf("unable to delete job metadata: %w", err) - } - } + // row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) + // var schemaExists pgtype.Bool + // err = row.Scan(&schemaExists) + // if err != nil { + // return fmt.Errorf("unable to check if internal schema exists: %w", err) + // } - err = syncFlowCleanupTx.Commit() - if err != nil { - return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err) - } + // if schemaExists.Bool { + // _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, c.metadataSchema, + // getRawTableIdentifier(jobName))) + // if err != nil { + // return fmt.Errorf("unable to drop raw table: %w", err) + // } + // _, err = syncFlowCleanupTx.ExecContext(c.ctx, + // fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) + // if err != nil { + // return fmt.Errorf("unable to delete job metadata: %w", err) + // } + // } - err = c.dropStage("", jobName) + // err = syncFlowCleanupTx.Commit() + // if err != nil { + // return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err) + // } + + // err = c.dropStage("", jobName) + // if err != nil { + // return err + // } + err := c.pgMetadata.DropMetadata(jobName) if err != nil { return err } - return nil } @@ -288,53 +312,6 @@ func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error { func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string, schemaDeltas []*protos.TableSchemaDelta, ) error { - if len(schemaDeltas) == 0 { - return nil - } - - tableSchemaModifyTx, err := c.database.Begin() - if err != nil { - return fmt.Errorf("error starting transaction for schema modification: %w", - err) - } - defer func() { - deferErr := tableSchemaModifyTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error rolling back transaction for table schema modification", slog.Any("error", deferErr)) - } - }() - - for _, schemaDelta := range schemaDeltas { - if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { - continue - } - - for _, addedColumn := range schemaDelta.AddedColumns { - sfColtype, err := qValueKindToSnowflakeType(qvalue.QValueKind(addedColumn.ColumnType)) - if err != nil { - return fmt.Errorf("failed to convert column type %s to snowflake type: %w", - addedColumn.ColumnType, err) - } - _, err = tableSchemaModifyTx.ExecContext(c.ctx, - fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s", - schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName), sfColtype)) - if err != nil { - return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, - schemaDelta.DstTableName, err) - } - c.logger.Info(fmt.Sprintf("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName, - addedColumn.ColumnType), - slog.String("destination table name", schemaDelta.DstTableName), - slog.String("source table name", schemaDelta.SrcTableName)) - } - } - - err = tableSchemaModifyTx.Commit() - if err != nil { - return fmt.Errorf("failed to commit transaction for table schema modification: %w", - err) - } - return nil } @@ -353,6 +330,12 @@ func (c *ClickhouseConnector) SetupMetadataTables() error { return nil } +// func (c *ClickhouseConnector) SetupNormalizedTables( +// req *protos.SetupNormalizedTableBatchInput, +// ) (*protos.SetupNormalizedTableBatchOutput, error) { +// return nil, nil +// } + func (c *ClickhouseConnector) GetLastSyncBatchID(jobName string) (int64, error) { return c.pgMetadata.GetLastBatchID(jobName) } @@ -371,3 +354,11 @@ func (c *ClickhouseConnector) SetLastOffset(jobName string, offset int64) error return nil } + +// func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { +// return &model.NormalizeResponse{ +// Done: true, +// StartBatchID: 1, +// EndBatchID: 1, +// }, nil +// } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 84cc179b1a..3dba192bb1 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -30,8 +30,9 @@ func NewClickhouseConnector(ctx context.Context, return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } + metadataSchemaName := "peerdb_s3_metadata" // #nosec G101 pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, - config.GetMetadataDb(), metadataSchemaName) + clickhouseProtoConfig.GetMetadataDb(), metadataSchemaName) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 74ffe26524..8d07b06449 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -24,6 +24,7 @@ func (c *ClickhouseConnector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { + fmt.Printf("\n******************* in ClickhouseConnector.SyncQRepRecords") // Ensure the destination table is available. destTable := config.DestinationTableIdentifier flowLog := slog.Group("sync_metadata", diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 0b1d02f6d1..d64bdea54c 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -30,26 +30,50 @@ func NewClickhouseAvroSyncMethod( } } -func (s *ClickhouseAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage string) error { - if avroFile.StorageLocation != avro.AvroLocalStorage { - s.connector.logger.Info("no file to put to stage") - return nil +// func (s *ClickhouseAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage string) error { +// if avroFile.StorageLocation != avro.AvroLocalStorage { +// s.connector.logger.Info("no file to put to stage") +// return nil +// } + +// activity.RecordHeartbeat(s.connector.ctx, "putting file to stage") +// putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage) + +// shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { +// return fmt.Sprintf("putting file to stage %s", stage) +// }) +// defer shutdown() + +// if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil { +// return fmt.Errorf("failed to put file to stage: %w", err) +// } + +// s.connector.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage)) +// return nil +// } + +func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error { + fmt.Printf("\n************************* in CopyStageToDesti stagingPath: %+v", s.config.StagingPath) + stagingPath := s.config.StagingPath //"s3://avro-clickhouse" + s3o, err := utils.NewS3BucketAndPrefix(stagingPath) + if err != nil { + return err } + awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{}) + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) - activity.RecordHeartbeat(s.connector.ctx, "putting file to stage") - putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage) + if err != nil { + return err + } + //nolint:gosec + query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", + s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) - shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { - return fmt.Sprintf("putting file to stage %s", stage) - }) - defer shutdown() + fmt.Printf("\n************************ CopyStagingToDestination query: %s\n", query) - if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil { - return fmt.Errorf("failed to put file to stage: %w", err) - } + _, err = s.connector.database.Exec(query) - s.connector.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage)) - return nil + return err } func (s *ClickhouseAvroSyncMethod) SyncRecords( @@ -57,6 +81,9 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( stream *model.QRecordStream, flowJobName string, ) (int, error) { + fmt.Printf("\n************************* in qrep_avro_sync: SyncRecords1 dstTableSchema %+v", dstTableSchema) + fmt.Printf("\n************************ in qrep_avro_sync: SyncRecords2 config %+v", s.config) + //s.config.StagingPath = "s3://avro-clickhouse" tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier) dstTableName := s.config.DestinationTableIdentifier @@ -65,6 +92,8 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( return -1, fmt.Errorf("failed to get schema from stream: %w", err) } + fmt.Printf("\n******************************* in qrep_avro_sync: SyncRecords3 stream schema %+v", schema) + s.connector.logger.Info("sync function called and schema acquired", tableLog) avroSchema, err := s.getAvroSchema(dstTableName, schema) @@ -72,38 +101,49 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( return 0, err } + fmt.Printf("\n******************************* in qrep_avro_sync: SyncRecords5 avro schema %+v", avroSchema) + partitionID := shared.RandomString(16) + fmt.Printf("\n******************* calling writeToAvroFile partitionId: %+v", partitionID) avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) + fmt.Printf("\n******************* records written to avrofile %+v", avroFile) if err != nil { return 0, err } defer avroFile.Cleanup() s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog) - stage := s.connector.getStageNameForJob(s.config.FlowJobName) - err = s.connector.createStage(stage, s.config) + // stage := s.connector.getStageNameForJob(s.config.FlowJobName) + // err = s.connector.createStage(stage, s.config) + // if err != nil { + // return 0, err + // } + // s.connector.logger.Info(fmt.Sprintf("Created stage %s", stage)) + + // colNames, _, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier) + // if err != nil { + // return 0, err + // } + + // err = s.putFileToStage(avroFile, "stage") + // if err != nil { + // return 0, err + // } + // s.connector.logger.Info("pushed avro file to stage", tableLog) + + // err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, colNames) + // if err != nil { + // return 0, err + // } + // s.connector.logger.Info(fmt.Sprintf("copying records into %s from stage %s", + // s.config.DestinationTableIdentifier, stage)) + + //Copy stage/avro to destination + err = s.CopyStageToDestination(avroFile) + fmt.Printf("\n ***************** in qrep_avro_sync: SyncRecords after CopyStageToDestination err: %+v", err) if err != nil { return 0, err } - s.connector.logger.Info(fmt.Sprintf("Created stage %s", stage)) - - colNames, _, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier) - if err != nil { - return 0, err - } - - err = s.putFileToStage(avroFile, stage) - if err != nil { - return 0, err - } - s.connector.logger.Info("pushed avro file to stage", tableLog) - - err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, colNames) - if err != nil { - return 0, err - } - s.connector.logger.Info(fmt.Sprintf("copying records into %s from stage %s", - s.config.DestinationTableIdentifier, stage)) return avroFile.NumRecords, nil } @@ -114,9 +154,10 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( dstTableSchema []*sql.ColumnType, stream *model.QRecordStream, ) (int, error) { + fmt.Printf("\n************************* in SyncQRepRecords 1") startTime := time.Now() dstTableName := config.DestinationTableIdentifier - // s.config.StagingPath = "s3://avro-clickhouse" + //s.config.StagingPath = "s3://avro-clickhouse" schema, err := stream.Schema() if err != nil { @@ -140,6 +181,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{}) avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) + fmt.Printf("\n*********************** in qrep_avro_sync SyncQRepRecords 4 avroFileUrl: %+v", avroFileUrl) + if err != nil { return 0, err } @@ -147,7 +190,12 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) + fmt.Printf("\n************************************ in qrep_avro_sync SyncQRepRecords 5 query: %s\n", query) + _, err = s.connector.database.Exec(query) + + fmt.Printf("\n************************************ in qrep_avro_sync SyncQRepRecords 6 err: %+v\n", err) + if err != nil { return 0, err } @@ -179,15 +227,18 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( partitionID string, flowJobName string, ) (*avro.AvroFile, error) { + stagingPath := s.config.StagingPath //"s3://avro-clickhouse" + fmt.Printf("\n****************************************** StagingPath: %+v*****\n", s.config.StagingPath) ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse) - s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) + s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return nil, fmt.Errorf("failed to parse staging path: %w", err) } s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{}) + fmt.Printf("\n************************* writeToAvroFile 2 avroFile %+v, err: %+v", avroFile, err) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 8abc6f3bf8..e7e2974035 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -157,7 +157,10 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig()) case *protos.Peer_S3Config: return conns3.NewS3Connector(ctx, config.GetS3Config()) + case *protos.Peer_ClickhouseConfig: + return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: + fmt.Printf("\n*********************** in GetCDCSyncConnector not found %+v %T\n", inner, inner) return nil, ErrUnsupportedFunctionality } } @@ -173,7 +176,10 @@ func GetCDCNormalizeConnector(ctx context.Context, return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig()) + case *protos.Peer_ClickhouseConfig: + return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: + fmt.Printf("\n*********************** in GetCDCNormalizeConnector not found %+v %T\n", inner, inner) return nil, ErrUnsupportedFunctionality } } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index b26aacf637..48f059b607 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -98,8 +98,10 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { func (p *PostgresMetadataStore) SetupMetadata() error { // create the schema + fmt.Printf("\n********** SetupMetadata 1, schema name %+v", p.schemaName) _, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) if err != nil && !utils.IsUniqueError(err) { + fmt.Printf("********** error in SetupMetadata %+v", err) p.logger.Error("failed to create schema", slog.Any("error", err)) return err } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 2fadcbbe62..6cfd7c5675 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -35,6 +35,8 @@ type QRepPartitionFlowExecution struct { // returns a new empty QRepFlowState func NewQRepFlowState() *protos.QRepFlowState { + fmt.Printf("\n*****************************NewQRepFlowState") + return &protos.QRepFlowState{ LastPartition: &protos.QRepPartition{ PartitionId: "not-applicable-partition", @@ -48,6 +50,8 @@ func NewQRepFlowState() *protos.QRepFlowState { // returns a new empty QRepFlowState func NewQRepFlowStateForTesting() *protos.QRepFlowState { + fmt.Printf("\n*****************************NewQRepFlowStateForTesting") + return &protos.QRepFlowState{ LastPartition: &protos.QRepPartition{ PartitionId: "not-applicable-partition", @@ -61,6 +65,8 @@ func NewQRepFlowStateForTesting() *protos.QRepFlowState { // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution { + fmt.Printf("\n*****************************NewQRepFlowExecution") + return &QRepFlowExecution{ config: config, flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, @@ -75,6 +81,8 @@ func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUU func NewQRepPartitionFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string, ) *QRepPartitionFlowExecution { + fmt.Printf("\n*****************************NewQRepPartitionFlowExecution") + return &QRepPartitionFlowExecution{ config: config, flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, @@ -410,6 +418,8 @@ func QRepFlowWorkflow( config *protos.QRepConfig, state *protos.QRepFlowState, ) error { + fmt.Printf("\n*****************************QRepFlowWorkflow") + // The structure of this workflow is as follows: // 1. Start the loop to continuously run the replication flow. // 2. In the loop, query the source database to get the partitions to replicate. @@ -547,6 +557,8 @@ func QRepPartitionWorkflow( partitions *protos.QRepPartitionBatch, runUUID string, ) error { + fmt.Printf("\n*****************************QRepPartitionWorkflow") + ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) q := NewQRepPartitionFlowExecution(ctx, config, runUUID) return q.ReplicatePartitions(ctx, partitions) diff --git a/protos/peers.proto b/protos/peers.proto index b16c35fccd..58c7a6b9fe 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -96,6 +96,7 @@ message ClickhouseConfig{ string password = 4; string database = 5; string s3_integration = 6; // staging to store avro files + PostgresConfig metadata_db = 7; } message SqlServerConfig { diff --git a/ui/app/api/peers/getTruePeer.ts b/ui/app/api/peers/getTruePeer.ts index 1af4155dec..75aa53902b 100644 --- a/ui/app/api/peers/getTruePeer.ts +++ b/ui/app/api/peers/getTruePeer.ts @@ -8,6 +8,7 @@ import { S3Config, SnowflakeConfig, SqlServerConfig, + ClickhouseConfig, } from '@/grpc_generated/peers'; export const getTruePeer = (peer: CatalogPeer) => { @@ -23,7 +24,8 @@ export const getTruePeer = (peer: CatalogPeer) => { | EventHubConfig | S3Config | SqlServerConfig - | EventHubGroupConfig; + | EventHubGroupConfig + | ClickhouseConfig; switch (peer.type) { case 0: config = BigqueryConfig.decode(options); @@ -53,6 +55,10 @@ export const getTruePeer = (peer: CatalogPeer) => { config = EventHubGroupConfig.decode(options); newPeer.eventhubGroupConfig = config; break; + case 8: + config = ClickhouseConfig.decode(options); + newPeer.clickhouseConfig = config; + break; default: return newPeer; }