Skip to content

Commit

Permalink
Merge branch 'main' into fix-go-mod
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 4, 2024
2 parents 49392b0 + 1917c81 commit 2490a83
Show file tree
Hide file tree
Showing 58 changed files with 586 additions and 324 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/update-docker-compose-stable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Update docker-compose.yaml tags

on:
schedule:
- cron: '0 15 * * 1'
workflow_dispatch:
inputs: {}
permissions:
issues: write
pull-requests: write
contents: write


env:
PR_BRANCH: automated/docker-compose-image-tags-upgrade
PR_LABEL: dependencies
PR_TITLE: "feat: upgrade `docker-compose.yml` stable image tags"

jobs:
update-docker-compose-tag:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
ref: main
- name: create-PR
shell: bash
run: |
set -eou pipefail
latest_tag="$(gh api \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
/repos/${{ github.repository }}/releases/latest | jq -r '.tag_name')"
sed -i -E 's|(image: ghcr\.io/peerdb\-io/.*?:stable-)(.*$)|\1'"${latest_tag}"'|g' docker-compose.yml
git config --global user.name "${GITHUB_ACTOR}"
git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com"
git checkout -b "${PR_BRANCH}"
git fetch || true
git add -u
git commit -m 'chore(automated): upgrade docker-compose.yml stable tags'
git push -u origin "${PR_BRANCH}" --force-with-lease
PR_ID=$(gh pr list --label "${PR_LABEL}" --head "${PR_BRANCH}" --json number | jq -r '.[0].number // ""')
if [ "$PR_ID" == "" ]; then
PR_ID=$(gh pr create -l "$PR_LABEL" -t "$PR_TITLE" --body "")
fi
gh pr merge --auto --squash
env:
GH_TOKEN: ${{ github.token }}
10 changes: 5 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ services:

flow-api:
container_name: flow_api
image: ghcr.io/peerdb-io/flow-api:stable-v0.19.1@sha256:a759b2d1b14f11d09ade672c268abcb456fd8884468547ea0f467cdfb60a0994
image: ghcr.io/peerdb-io/flow-api:stable-v0.20.0
restart: unless-stopped
ports:
- 8112:8112
Expand All @@ -128,7 +128,7 @@ services:

flow-snapshot-worker:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.19.1@sha256:894c1fea1cf9a4f5622420d8630509243b60cf177e107ec4d14d7294a9490451
image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.20.0
restart: unless-stopped
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
Expand All @@ -138,7 +138,7 @@ services:

flow-worker:
container_name: flow-worker
image: ghcr.io/peerdb-io/flow-worker:stable-v0.19.1@sha256:4482314bd3bd4a96930fbee10c00a9f2d5764e86cfd8802642589d339cf04054
image: ghcr.io/peerdb-io/flow-worker:stable-v0.20.0
restart: unless-stopped
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
Expand All @@ -151,7 +151,7 @@ services:
peerdb:
container_name: peerdb-server
stop_signal: SIGINT
image: ghcr.io/peerdb-io/peerdb-server:stable-v0.19.1@sha256:c736500e0b42f100df29af43ecf4c96d0c8f4805dd294fecd0bb4ce7b7897a18
image: ghcr.io/peerdb-io/peerdb-server:stable-v0.20.0
restart: unless-stopped
environment:
<<: *catalog-config
Expand All @@ -167,7 +167,7 @@ services:

peerdb-ui:
container_name: peerdb-ui
image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.19.1@sha256:ffc4b5960dc1653a59e680c61fca0ba2c5891cb4965e4662927d9886f4d7f6bc
image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.20.0
restart: unless-stopped
ports:
- 3000:3000
Expand Down
3 changes: 2 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
defer connectors.CloseConnector(ctx, dstConn)

if err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatchSync.SchemaDeltas); err != nil {
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

Expand Down Expand Up @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
})

errGroup.Go(func() error {
var err error
rowsSynced, err = syncRecords(dstConn, errCtx, config, partition, outstream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down
5 changes: 2 additions & 3 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -120,7 +119,7 @@ func (h *FlowRequestHandler) ListPeers(
req *protos.ListPeersRequest,
) (*protos.ListPeersResponse, error) {
query := "SELECT name, type FROM peers"
if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
if peerdbenv.PeerDBOnlyClickHouseAllowed() {
// only postgres and clickhouse peers
query += " WHERE type IN (3, 8)"
}
Expand Down Expand Up @@ -148,7 +147,7 @@ func (h *FlowRequestHandler) ListPeers(
}

destinationItems := peers
if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
if peerdbenv.PeerDBOnlyClickHouseAllowed() {
destinationItems = make([]*protos.PeerListItem, 0, len(peers))
for _, peer := range peers {
// only clickhouse peers
Expand Down
3 changes: 1 addition & 2 deletions flow/cmd/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"log/slog"
"slices"
"strings"

"github.com/jackc/pgx/v5"

Expand Down Expand Up @@ -37,7 +36,7 @@ func (h *FlowRequestHandler) GetDynamicSettings(
return nil, err
}

if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
if peerdbenv.PeerDBOnlyClickHouseAllowed() {
filteredSettings := make([]*protos.DynamicSetting, 0)
for _, setting := range settings {
if setting.TargetForSetting == protos.DynconfTarget_ALL ||
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *BigQueryConnector) waitForTableReady(ctx context.Context, datasetTable
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *BigQueryConnector) SyncQRepRecords(
partition.PartitionId, destTable))

avroSync := NewQRepAvroSyncMethod(c, config.StagingPath, config.FlowJobName)
return avroSync.SyncQRepRecords(ctx, config.FlowJobName, destTable, partition,
return avroSync.SyncQRepRecords(ctx, config.Env, config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName)
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
}
}

err = c.ReplayTableSchemaDeltas(ctx, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
if err != nil {
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
}

stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, strconv.FormatInt(syncBatchID, 10))
numRecords, err := s.writeToStage(ctx, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
numRecords, err := s.writeToStage(ctx, req.Env, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
&datasetTable{
project: s.connector.projectID,
dataset: s.connector.datasetID,
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("dstTableName", rawTableName))

err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
err = s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand Down Expand Up @@ -139,6 +139,7 @@ func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softD

func (s *QRepAvroSyncMethod) SyncQRepRecords(
ctx context.Context,
env map[string]string,
flowJobName string,
dstTableName string,
partition *protos.QRepPartition,
Expand Down Expand Up @@ -167,7 +168,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
table: fmt.Sprintf("%s_%s_staging", dstDatasetTable.table,
strings.ReplaceAll(partition.PartitionId, "-", "_")),
}
numRecords, err := s.writeToStage(ctx, partition.PartitionId, flowJobName, avroSchema,
numRecords, err := s.writeToStage(ctx, env, partition.PartitionId, flowJobName, avroSchema,
stagingDatasetTable, stream, flowJobName)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %w", err)
Expand Down Expand Up @@ -389,6 +390,7 @@ func GetAvroField(bqField *bigquery.FieldSchema) (AvroField, error) {

func (s *QRepAvroSyncMethod) writeToStage(
ctx context.Context,
env map[string]string,
syncID string,
objectFolder string,
avroSchema *model.QRecordAvroSchemaDefinition,
Expand All @@ -408,7 +410,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(ctx)

numRecords, err := ocfWriter.WriteOCF(ctx, w)
numRecords, err := ocfWriter.WriteOCF(ctx, env, w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
Expand All @@ -426,7 +428,7 @@ func (s *QRepAvroSyncMethod) writeToStage(

avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
s.connector.logger.Info("writing records to local file", idLog)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, avroFilePath)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, env, avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
return nil, err
}

if err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

Expand All @@ -120,7 +120,10 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return res, nil
}

func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJobName string,
func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
Expand All @@ -133,7 +136,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ
}

for _, addedColumn := range schemaDelta.AddedColumns {
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(protos.DBType_CLICKHOUSE)
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(ctx, env, protos.DBType_CLICKHOUSE, addedColumn)
if err != nil {
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
}
Expand Down
18 changes: 17 additions & 1 deletion flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *ClickHouseConnector) ValidateCheck(ctx context.Context) error {
return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
if err := c.exec(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName); err != nil {
c.logger.Error("validation failed to drop table", slog.String("table", validateDummyTableName), slog.Any("error", err))
Expand Down Expand Up @@ -396,6 +396,9 @@ func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tab
c.logger.Warn("[clickhouse] table engine not explicitly supported",
slog.String("table", tableName), slog.String("engine", engine))
}
if peerdbenv.PeerDBOnlyClickHouseAllowed() && !strings.HasPrefix(engine, "Shared") {
return fmt.Errorf("table %s exists and does not use SharedMergeTree engine", tableName)
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to read rows: %w", err)
Expand Down Expand Up @@ -476,6 +479,19 @@ func (c *ClickHouseConnector) processTableComparison(dstTableName string, srcSch
func (c *ClickHouseConnector) CheckDestinationTables(ctx context.Context, req *protos.FlowConnectionConfigs,
tableNameSchemaMapping map[string]*protos.TableSchema,
) error {
if peerdbenv.PeerDBOnlyClickHouseAllowed() {
// this is to indicate ClickHouse Cloud service is now creating tables with Shared* by default
var cloudModeEngine bool
if err := c.queryRow(ctx,
"SELECT value='2' AND changed='1' AND readonly='1' FROM system.settings WHERE name = 'cloud_mode_engine'").
Scan(&cloudModeEngine); err != nil {
return fmt.Errorf("failed to validate cloud_mode_engine setting: %w", err)
}
if !cloudModeEngine {
return errors.New("ClickHouse service is not migrated to use SharedMergeTree tables, please contact support")
}
}

peerDBColumns := []string{signColName, versionColName}
if req.SyncedAtColName != "" {
peerDBColumns = append(peerDBColumns, strings.ToLower(req.SyncedAtColName))
Expand Down
38 changes: 10 additions & 28 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"golang.org/x/sync/errgroup"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -81,16 +80,6 @@ func getColName(overrides map[string]string, name string) string {
return name
}

func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string {
rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier)
if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision {
return "String"
} else {
precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{})
return fmt.Sprintf("Decimal(%d, %d)", precision, scale)
}
}

func generateCreateTableSQLForNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
Expand Down Expand Up @@ -142,14 +131,10 @@ func generateCreateTableSQLForNormalizedTable(
}

if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
}
var err error
clickHouseType, err = colType.ToDWHColumnType(ctx, config.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
}
}
if (tableSchema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
Expand Down Expand Up @@ -368,16 +353,13 @@ func (c *ClickHouseConnector) NormalizeRecords(

colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName))
if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
var err error
clickHouseType, err = colType.ToDWHColumnType(ctx, req.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}

if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
clickHouseType = fmt.Sprintf("Nullable(%s)", clickHouseType)
}
Expand Down
Loading

0 comments on commit 2490a83

Please sign in to comment.