Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE #2256

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 56 additions & 16 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"golang.org/x/sync/errgroup"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -262,8 +265,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
}, nil
}

err = c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID)
if err != nil {
if err := c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID); err != nil {
return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err)
}

Expand All @@ -278,9 +280,48 @@ func (c *ClickHouseConnector) NormalizeRecords(
return nil, err
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return nil, err
}

parallelNormalize, err := peerdbenv.PeerDBClickHouseParallelNormalize(ctx, req.Env)
if err != nil {
return nil, err
}
parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames))
if parallelNormalize > 1 {
c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize))
}

queries := make(chan string)
rawTbl := c.getRawTableName(req.FlowJobName)

// model the raw table data as inserts.
group, errCtx := errgroup.WithContext(ctx)
for i := range parallelNormalize {
group.Go(func() error {
var chConn clickhouse.Conn
if i == 0 {
chConn = c.database
} else {
var err error
chConn, err = Connect(errCtx, req.Env, c.config)
if err != nil {
return err
}
defer chConn.Close()
}

for query := range queries {
c.logger.Info("normalizing batch", slog.String("query", query))
if err := chConn.Exec(errCtx, query); err != nil {
return fmt.Errorf("error while inserting into normalized table: %w", err)
}
}
return nil
})
}

for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
selectQuery := strings.Builder{}
Expand All @@ -299,11 +340,6 @@ func (c *ClickHouseConnector) NormalizeRecords(
}
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return nil, err
}

projection := strings.Builder{}
projectionUpdate := strings.Builder{}

Expand Down Expand Up @@ -338,6 +374,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
}
Expand Down Expand Up @@ -433,15 +470,19 @@ func (c *ClickHouseConnector) NormalizeRecords(
insertIntoSelectQuery.WriteString(colSelector.String())
insertIntoSelectQuery.WriteString(selectQuery.String())

q := insertIntoSelectQuery.String()

if err := c.execWithLogging(ctx, q); err != nil {
return nil, fmt.Errorf("error while inserting into normalized table: %w", err)
select {
case queries <- insertIntoSelectQuery.String():
case <-errCtx.Done():
close(queries)
return nil, ctx.Err()
}
}
close(queries)
if err := group.Wait(); err != nil {
return nil, err
}

err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID); err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err))
return nil, err
}
Expand Down Expand Up @@ -510,8 +551,7 @@ func (c *ClickHouseConnector) copyAvroStagesToDestination(
ctx context.Context, flowJobName string, normBatchID, syncBatchID int64,
) error {
for s := normBatchID + 1; s <= syncBatchID; s++ {
err := c.copyAvroStageToDestination(ctx, flowJobName, s)
if err != nil {
if err := c.copyAvroStageToDestination(ctx, flowJobName, s); err != nil {
return fmt.Errorf("failed to copy avro stage to destination: %w", err)
}
}
Expand Down
24 changes: 18 additions & 6 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE",
Description: "Divide tables in batch into N insert selects. Helps distribute load to multiple nodes",
DefaultValue: "0",
ValueType: protos.DynconfValueType_INT,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES",
Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely",
Expand Down Expand Up @@ -256,8 +264,8 @@ func dynamicConfSigned[T constraints.Signed](ctx context.Context, env map[string
return strconv.ParseInt(value, 10, 64)
})
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.Any("error", err))
return 0, fmt.Errorf("failed to parse as int64: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.String("key", key), slog.Any("error", err))
return 0, fmt.Errorf("failed to parse %s as int64: %w", key, err)
}

return T(value), nil
Expand All @@ -268,8 +276,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st
return strconv.ParseUint(value, 10, 64)
})
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.Any("error", err))
return 0, fmt.Errorf("failed to parse as uint64: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.String("key", key), slog.Any("error", err))
return 0, fmt.Errorf("failed to parse %s as uint64: %w", key, err)
}

return T(value), nil
Expand All @@ -278,8 +286,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st
func dynamicConfBool(ctx context.Context, env map[string]string, key string) (bool, error) {
value, err := dynLookupConvert(ctx, env, key, strconv.ParseBool)
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.Any("error", err))
return false, fmt.Errorf("failed to parse bool: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.String("key", key), slog.Any("error", err))
return false, fmt.Errorf("failed to parse %s as bool: %w", key, err)
}

return value, nil
Expand Down Expand Up @@ -374,6 +382,10 @@ func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string
return dynamicConfSigned[int64](ctx, env, "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS")
}

func PeerDBClickHouseParallelNormalize(ctx context.Context, env map[string]string) (int, error) {
return dynamicConfSigned[int](ctx, env, "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE")
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM")
}
Expand Down
Loading