Skip to content

Commit

Permalink
Merge branch 'main' into snapshot-disconnected-ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Nov 14, 2024
2 parents bee19f4 + 8c02a5e commit 37d0608
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 28 deletions.
12 changes: 10 additions & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewClickHouseConnector(
config *protos.ClickhouseConfig,
) (*ClickHouseConnector, error) {
logger := shared.LoggerFromCtx(ctx)
database, err := Connect(ctx, config)
database, err := Connect(ctx, env, config)
if err != nil {
return nil, fmt.Errorf("failed to open connection to ClickHouse peer: %w", err)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func NewClickHouseConnector(
return connector, nil
}

func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
var tlsSetting *tls.Config
if !config.DisableTls {
tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13}
Expand All @@ -228,6 +228,13 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C
tlsSetting.RootCAs = caPool
}

var settings clickhouse.Settings
if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil {
return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err)
} else if maxInsertThreads != 0 {
settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads}
}

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
Auth: clickhouse.Auth{
Expand All @@ -245,6 +252,7 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C
{Name: "peerdb"},
},
},
Settings: settings,
DialTimeout: 3600 * time.Second,
ReadTimeout: 3600 * time.Second,
})
Expand Down
72 changes: 56 additions & 16 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"golang.org/x/sync/errgroup"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -262,8 +265,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
}, nil
}

err = c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID)
if err != nil {
if err := c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID); err != nil {
return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err)
}

Expand All @@ -278,9 +280,48 @@ func (c *ClickHouseConnector) NormalizeRecords(
return nil, err
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return nil, err
}

parallelNormalize, err := peerdbenv.PeerDBClickHouseParallelNormalize(ctx, req.Env)
if err != nil {
return nil, err
}
parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames))
if parallelNormalize > 1 {
c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize))
}

queries := make(chan string)
rawTbl := c.getRawTableName(req.FlowJobName)

// model the raw table data as inserts.
group, errCtx := errgroup.WithContext(ctx)
for i := range parallelNormalize {
group.Go(func() error {
var chConn clickhouse.Conn
if i == 0 {
chConn = c.database
} else {
var err error
chConn, err = Connect(errCtx, req.Env, c.config)
if err != nil {
return err
}
defer chConn.Close()
}

for query := range queries {
c.logger.Info("normalizing batch", slog.String("query", query))
if err := chConn.Exec(errCtx, query); err != nil {
return fmt.Errorf("error while inserting into normalized table: %w", err)
}
}
return nil
})
}

for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
selectQuery := strings.Builder{}
Expand All @@ -299,11 +340,6 @@ func (c *ClickHouseConnector) NormalizeRecords(
}
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return nil, err
}

projection := strings.Builder{}
projectionUpdate := strings.Builder{}

Expand Down Expand Up @@ -338,6 +374,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
}
Expand Down Expand Up @@ -433,15 +470,19 @@ func (c *ClickHouseConnector) NormalizeRecords(
insertIntoSelectQuery.WriteString(colSelector.String())
insertIntoSelectQuery.WriteString(selectQuery.String())

q := insertIntoSelectQuery.String()

if err := c.execWithLogging(ctx, q); err != nil {
return nil, fmt.Errorf("error while inserting into normalized table: %w", err)
select {
case queries <- insertIntoSelectQuery.String():
case <-errCtx.Done():
close(queries)
return nil, ctx.Err()
}
}
close(queries)
if err := group.Wait(); err != nil {
return nil, err
}

err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID); err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err))
return nil, err
}
Expand Down Expand Up @@ -510,8 +551,7 @@ func (c *ClickHouseConnector) copyAvroStagesToDestination(
ctx context.Context, flowJobName string, normBatchID, syncBatchID int64,
) error {
for s := normBatchID + 1; s <= syncBatchID; s++ {
err := c.copyAvroStageToDestination(ctx, flowJobName, s)
if err != nil {
if err := c.copyAvroStageToDestination(ctx, flowJobName, s); err != nil {
return fmt.Errorf("failed to copy avro stage to destination: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s ClickHouseSuite) Teardown() {
}

func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) {
ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig())
ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func SetupSuite(t *testing.T) ClickHouseSuite {
s3Helper: s3Helper,
}

ch, err := connclickhouse.Connect(context.Background(), s.PeerForDatabase("default").GetClickhouseConfig())
ch, err := connclickhouse.Connect(context.Background(), nil, s.PeerForDatabase("default").GetClickhouseConfig())
require.NoError(t, err, "failed to connect to clickhouse")
err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix)
require.NoError(t, err, "failed to create clickhouse database")
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) {
})
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
// now test weird names with rename based resync
ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig())
ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig())
require.NoError(s.t, err)
require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("DROP TABLE `%s`", dstTableName)))
require.NoError(s.t, ch.Close())
Expand All @@ -523,7 +523,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) {
})
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
// now test weird names with exchange based resync
ch, err = connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig())
ch, err = connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig())
require.NoError(s.t, err)
require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("TRUNCATE TABLE `%s`", dstTableName)))
require.NoError(s.t, ch.Close())
Expand Down
36 changes: 30 additions & 6 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS",
Description: "Configures max_insert_threads setting on clickhouse for inserting into destination table. Setting left unset when 0",
DefaultValue: "0",
ValueType: protos.DynconfValueType_UINT,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE",
Description: "Divide tables in batch into N insert selects. Helps distribute load to multiple nodes",
DefaultValue: "0",
ValueType: protos.DynconfValueType_INT,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES",
Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely",
Expand Down Expand Up @@ -248,8 +264,8 @@ func dynamicConfSigned[T constraints.Signed](ctx context.Context, env map[string
return strconv.ParseInt(value, 10, 64)
})
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.Any("error", err))
return 0, fmt.Errorf("failed to parse as int64: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.String("key", key), slog.Any("error", err))
return 0, fmt.Errorf("failed to parse %s as int64: %w", key, err)
}

return T(value), nil
Expand All @@ -260,8 +276,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st
return strconv.ParseUint(value, 10, 64)
})
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.Any("error", err))
return 0, fmt.Errorf("failed to parse as uint64: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.String("key", key), slog.Any("error", err))
return 0, fmt.Errorf("failed to parse %s as uint64: %w", key, err)
}

return T(value), nil
Expand All @@ -270,8 +286,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st
func dynamicConfBool(ctx context.Context, env map[string]string, key string) (bool, error) {
value, err := dynLookupConvert(ctx, env, key, strconv.ParseBool)
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.Any("error", err))
return false, fmt.Errorf("failed to parse bool: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.String("key", key), slog.Any("error", err))
return false, fmt.Errorf("failed to parse %s as bool: %w", key, err)
}

return value, nil
Expand Down Expand Up @@ -362,6 +378,14 @@ func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]str
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE")
}

func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS")
}

func PeerDBClickHouseParallelNormalize(ctx context.Context, env map[string]string) (int, error) {
return dynamicConfSigned[int](ctx, env, "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE")
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM")
}
Expand Down

0 comments on commit 37d0608

Please sign in to comment.