From db6949ac0742c2789852ad4639c01fe9da13768c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 9 Oct 2024 17:54:51 +0000 Subject: [PATCH] prototype: push clickhouse cdc directly --- flow/connectors/clickhouse/cdc.go | 35 +++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 1c0b651ad0..0eda824eaa 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -75,7 +75,7 @@ func (c *ClickHouseConnector) avroSyncMethod(flowJobName string) *ClickHouseAvro return NewClickHouseAvroSyncMethod(qrepConfig, c) } -func (c *ClickHouseConnector) syncRecordsViaAvro( +func (c *ClickHouseConnector) syncRecords( ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems], syncBatchID int64, @@ -87,14 +87,32 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } - avroSyncer := c.avroSyncMethod(req.FlowJobName) - numRecords, err := avroSyncer.SyncRecords(ctx, stream, req.FlowJobName, syncBatchID) + batch, err := c.database.PrepareBatch(ctx, "INSERT INTO "+c.getRawTableName(req.FlowJobName)) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to begin inserting batch: %w", err) + } + numRecords := 0 + row := make([]any, 0, 8) + for record := range stream.Records { + row = row[:0] + for _, qv := range record { + switch val := qv.(type) { + case qvalue.QValueString: + row = append(row, val.Val) + case qvalue.QValueInt64: + row = append(row, val.Val) + } + } + if err := batch.Append(row...); err != nil { + return nil, fmt.Errorf("failed to append to batch: %w", err) + } + numRecords += 1 + } + if err := batch.Send(); err != nil { + return nil, fmt.Errorf("failed to send batch: %w", err) } - err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) - if err != nil { + if err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -108,13 +126,12 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( } func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { - res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID) + res, err := c.syncRecords(ctx, req, req.SyncBatchID) if err != nil { return nil, err } - err = c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID) - if err != nil { + if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID); err != nil { c.logger.Error("failed to increment id", slog.Any("error", err)) return nil, err }