diff --git a/.github/workflows/update-docker-compose-stable.yaml b/.github/workflows/update-docker-compose-stable.yaml new file mode 100644 index 0000000000..437d83ea49 --- /dev/null +++ b/.github/workflows/update-docker-compose-stable.yaml @@ -0,0 +1,51 @@ +name: Update docker-compose.yaml tags + +on: + schedule: + - cron: '0 15 * * 1' + workflow_dispatch: + inputs: {} +permissions: + issues: write + pull-requests: write + contents: write + + +env: + PR_BRANCH: automated/docker-compose-image-tags-upgrade + PR_LABEL: dependencies + PR_TITLE: "feat: upgrade `docker-compose.yml` stable image tags" + +jobs: + update-docker-compose-tag: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + ref: main + - name: create-PR + shell: bash + run: | + set -eou pipefail + latest_tag="$(gh api \ + -H "Accept: application/vnd.github+json" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + /repos/${{ github.repository }}/releases/latest | jq -r '.tag_name')" + sed -i -E 's|(image: ghcr\.io/peerdb\-io/.*?:stable-)(.*$)|\1'"${latest_tag}"'|g' docker-compose.yml + git config --global user.name "${GITHUB_ACTOR}" + git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com" + git checkout -b "${PR_BRANCH}" + git fetch || true + git add -u + git commit -m 'chore(automated): upgrade docker-compose.yml stable tags' + git push -u origin "${PR_BRANCH}" --force-with-lease + + PR_ID=$(gh pr list --label "${PR_LABEL}" --head "${PR_BRANCH}" --json number | jq -r '.[0].number // ""') + if [ "$PR_ID" == "" ]; then + PR_ID=$(gh pr create -l "$PR_LABEL" -t "$PR_TITLE" --body "") + fi + + + gh pr merge --auto --squash + env: + GH_TOKEN: ${{ github.token }} diff --git a/docker-compose.yml b/docker-compose.yml index c7991b9e22..b2c5936a2b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -112,7 +112,7 @@ services: flow-api: container_name: flow_api - image: ghcr.io/peerdb-io/flow-api:stable-v0.19.1@sha256:a759b2d1b14f11d09ade672c268abcb456fd8884468547ea0f467cdfb60a0994 + image: ghcr.io/peerdb-io/flow-api:stable-v0.20.0 restart: unless-stopped ports: - 8112:8112 @@ -128,7 +128,7 @@ services: flow-snapshot-worker: container_name: flow-snapshot-worker - image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.19.1@sha256:894c1fea1cf9a4f5622420d8630509243b60cf177e107ec4d14d7294a9490451 + image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.20.0 restart: unless-stopped environment: <<: [*catalog-config, *flow-worker-env, *minio-config] @@ -138,7 +138,7 @@ services: flow-worker: container_name: flow-worker - image: ghcr.io/peerdb-io/flow-worker:stable-v0.19.1@sha256:4482314bd3bd4a96930fbee10c00a9f2d5764e86cfd8802642589d339cf04054 + image: ghcr.io/peerdb-io/flow-worker:stable-v0.20.0 restart: unless-stopped environment: <<: [*catalog-config, *flow-worker-env, *minio-config] @@ -151,7 +151,7 @@ services: peerdb: container_name: peerdb-server stop_signal: SIGINT - image: ghcr.io/peerdb-io/peerdb-server:stable-v0.19.1@sha256:c736500e0b42f100df29af43ecf4c96d0c8f4805dd294fecd0bb4ce7b7897a18 + image: ghcr.io/peerdb-io/peerdb-server:stable-v0.20.0 restart: unless-stopped environment: <<: *catalog-config @@ -167,7 +167,7 @@ services: peerdb-ui: container_name: peerdb-ui - image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.19.1@sha256:ffc4b5960dc1653a59e680c61fca0ba2c5891cb4965e4662927d9886f4d7f6bc + image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.20.0 restart: unless-stopped ports: - 3000:3000 diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 2d1f7e1f3e..a9f58a5f3f 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -225,7 +225,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } defer connectors.CloseConnector(ctx, dstConn) - if err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatchSync.SchemaDeltas); err != nil { + if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err) } @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn }) errGroup.Go(func() error { + var err error rowsSynced, err = syncRecords(dstConn, errCtx, config, partition, outstream) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index cb625978fe..cc41f6ef24 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "log/slog" - "strings" "time" "github.com/jackc/pgx/v5" @@ -120,7 +119,7 @@ func (h *FlowRequestHandler) ListPeers( req *protos.ListPeersRequest, ) (*protos.ListPeersResponse, error) { query := "SELECT name, type FROM peers" - if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) { + if peerdbenv.PeerDBOnlyClickHouseAllowed() { // only postgres and clickhouse peers query += " WHERE type IN (3, 8)" } @@ -148,7 +147,7 @@ func (h *FlowRequestHandler) ListPeers( } destinationItems := peers - if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) { + if peerdbenv.PeerDBOnlyClickHouseAllowed() { destinationItems = make([]*protos.PeerListItem, 0, len(peers)) for _, peer := range peers { // only clickhouse peers diff --git a/flow/cmd/settings.go b/flow/cmd/settings.go index dd4755f4ae..b2be7a1ba6 100644 --- a/flow/cmd/settings.go +++ b/flow/cmd/settings.go @@ -4,7 +4,6 @@ import ( "context" "log/slog" "slices" - "strings" "github.com/jackc/pgx/v5" @@ -37,7 +36,7 @@ func (h *FlowRequestHandler) GetDynamicSettings( return nil, err } - if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) { + if peerdbenv.PeerDBOnlyClickHouseAllowed() { filteredSettings := make([]*protos.DynamicSetting, 0) for _, setting := range settings { if setting.TargetForSetting == protos.DynconfTarget_ALL || diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index f990b2f19d..d6504322ca 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -203,6 +203,7 @@ func (c *BigQueryConnector) waitForTableReady(ctx context.Context, datasetTable // This could involve adding or dropping multiple columns. func (c *BigQueryConnector) ReplayTableSchemaDeltas( ctx context.Context, + env map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta, ) error { diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 3da50c8e8f..b184cc62a9 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -35,7 +35,7 @@ func (c *BigQueryConnector) SyncQRepRecords( partition.PartitionId, destTable)) avroSync := NewQRepAvroSyncMethod(c, config.StagingPath, config.FlowJobName) - return avroSync.SyncQRepRecords(ctx, config.FlowJobName, destTable, partition, + return avroSync.SyncQRepRecords(ctx, config.Env, config.FlowJobName, destTable, partition, tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName) } @@ -80,7 +80,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep( } } - err = c.ReplayTableSchemaDeltas(ctx, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}) + err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}) if err != nil { return nil, fmt.Errorf("failed to add columns to destination table: %w", err) } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index da3b15c37f..07285eb997 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -55,7 +55,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( } stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, strconv.FormatInt(syncBatchID, 10)) - numRecords, err := s.writeToStage(ctx, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema, + numRecords, err := s.writeToStage(ctx, req.Env, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema, &datasetTable{ project: s.connector.projectID, dataset: s.connector.datasetID, @@ -97,7 +97,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.String(string(shared.FlowNameKey), req.FlowJobName), slog.String("dstTableName", rawTableName)) - err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) + err = s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -139,6 +139,7 @@ func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softD func (s *QRepAvroSyncMethod) SyncQRepRecords( ctx context.Context, + env map[string]string, flowJobName string, dstTableName string, partition *protos.QRepPartition, @@ -167,7 +168,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( table: fmt.Sprintf("%s_%s_staging", dstDatasetTable.table, strings.ReplaceAll(partition.PartitionId, "-", "_")), } - numRecords, err := s.writeToStage(ctx, partition.PartitionId, flowJobName, avroSchema, + numRecords, err := s.writeToStage(ctx, env, partition.PartitionId, flowJobName, avroSchema, stagingDatasetTable, stream, flowJobName) if err != nil { return -1, fmt.Errorf("failed to push to avro stage: %w", err) @@ -389,6 +390,7 @@ func GetAvroField(bqField *bigquery.FieldSchema) (AvroField, error) { func (s *QRepAvroSyncMethod) writeToStage( ctx context.Context, + env map[string]string, syncID string, objectFolder string, avroSchema *model.QRecordAvroSchemaDefinition, @@ -408,7 +410,7 @@ func (s *QRepAvroSyncMethod) writeToStage( obj := bucket.Object(avroFilePath) w := obj.NewWriter(ctx) - numRecords, err := ocfWriter.WriteOCF(ctx, w) + numRecords, err := ocfWriter.WriteOCF(ctx, env, w) if err != nil { return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err) } @@ -426,7 +428,7 @@ func (s *QRepAvroSyncMethod) writeToStage( avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID) s.connector.logger.Info("writing records to local file", idLog) - avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, avroFilePath) + avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, env, avroFilePath) if err != nil { return 0, fmt.Errorf("failed to write records to local Avro file: %w", err) } diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index d3eb883b46..5dc8a14628 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -93,7 +93,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( return nil, err } - if err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas); err != nil { + if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -120,7 +120,10 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe return res, nil } -func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJobName string, +func (c *ClickHouseConnector) ReplayTableSchemaDeltas( + ctx context.Context, + env map[string]string, + flowJobName string, schemaDeltas []*protos.TableSchemaDelta, ) error { if len(schemaDeltas) == 0 { @@ -133,7 +136,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ } for _, addedColumn := range schemaDelta.AddedColumns { - clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(protos.DBType_CLICKHOUSE) + clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(ctx, env, protos.DBType_CLICKHOUSE, addedColumn) if err != nil { return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err) } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index f024a767e4..bf7e1b4c93 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -82,7 +82,7 @@ func (c *ClickHouseConnector) ValidateCheck(ctx context.Context) error { return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err) } defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() if err := c.exec(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName); err != nil { c.logger.Error("validation failed to drop table", slog.String("table", validateDummyTableName), slog.Any("error", err)) @@ -396,6 +396,9 @@ func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tab c.logger.Warn("[clickhouse] table engine not explicitly supported", slog.String("table", tableName), slog.String("engine", engine)) } + if peerdbenv.PeerDBOnlyClickHouseAllowed() && !strings.HasPrefix(engine, "Shared") { + return fmt.Errorf("table %s exists and does not use SharedMergeTree engine", tableName) + } } if err := rows.Err(); err != nil { return fmt.Errorf("failed to read rows: %w", err) @@ -476,6 +479,19 @@ func (c *ClickHouseConnector) processTableComparison(dstTableName string, srcSch func (c *ClickHouseConnector) CheckDestinationTables(ctx context.Context, req *protos.FlowConnectionConfigs, tableNameSchemaMapping map[string]*protos.TableSchema, ) error { + if peerdbenv.PeerDBOnlyClickHouseAllowed() { + // this is to indicate ClickHouse Cloud service is now creating tables with Shared* by default + var cloudModeEngine bool + if err := c.queryRow(ctx, + "SELECT value='2' AND changed='1' AND readonly='1' FROM system.settings WHERE name = 'cloud_mode_engine'"). + Scan(&cloudModeEngine); err != nil { + return fmt.Errorf("failed to validate cloud_mode_engine setting: %w", err) + } + if !cloudModeEngine { + return errors.New("ClickHouse service is not migrated to use SharedMergeTree tables, please contact support") + } + } + peerDBColumns := []string{signColName, versionColName} if req.SyncedAtColName != "" { peerDBColumns = append(peerDBColumns, strings.ToLower(req.SyncedAtColName)) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 2debe0f4d5..fabe07a35f 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -15,7 +15,6 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "golang.org/x/sync/errgroup" - "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -81,16 +80,6 @@ func getColName(overrides map[string]string, name string) string { return name } -func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string { - rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier) - if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision { - return "String" - } else { - precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) - return fmt.Sprintf("Decimal(%d, %d)", precision, scale) - } -} - func generateCreateTableSQLForNormalizedTable( ctx context.Context, config *protos.SetupNormalizedTableBatchInput, @@ -142,14 +131,10 @@ func generateCreateTableSQLForNormalizedTable( } if clickHouseType == "" { - if colType == qvalue.QValueKindNumeric { - clickHouseType = getClickhouseTypeForNumericColumn(column) - } else { - var err error - clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) - if err != nil { - return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err) - } + var err error + clickHouseType, err = colType.ToDWHColumnType(ctx, config.Env, protos.DBType_CLICKHOUSE, column) + if err != nil { + return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err) } } if (tableSchema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() { @@ -368,16 +353,13 @@ func (c *ClickHouseConnector) NormalizeRecords( colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName)) if clickHouseType == "" { - if colType == qvalue.QValueKindNumeric { - clickHouseType = getClickhouseTypeForNumericColumn(column) - } else { - var err error - clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) - if err != nil { - close(queries) - return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) - } + var err error + clickHouseType, err = colType.ToDWHColumnType(ctx, req.Env, protos.DBType_CLICKHOUSE, column) + if err != nil { + close(queries) + return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) } + if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() { clickHouseType = fmt.Sprintf("Nullable(%s)", clickHouseType) } diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index fa2cfe1034..61450dd55c 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -71,7 +71,7 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( s.logger.Info("sync function called and schema acquired", slog.String("dstTable", dstTableName)) - avroSchema, err := s.getAvroSchema(dstTableName, schema) + avroSchema, err := s.getAvroSchema(ctx, env, dstTableName, schema) if err != nil { return 0, err } @@ -106,7 +106,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( stagingPath := s.credsProvider.BucketPath startTime := time.Now() - avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema()) + avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema()) if err != nil { return 0, err } @@ -165,10 +165,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( } func (s *ClickHouseAvroSyncMethod) getAvroSchema( + ctx context.Context, + env map[string]string, dstTableName string, schema qvalue.QRecordSchema, ) (*model.QRecordAvroSchemaDefinition, error) { - avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_CLICKHOUSE) + avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_CLICKHOUSE) if err != nil { return nil, fmt.Errorf("failed to define Avro schema: %w", err) } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 073d9d82b4..0991a50978 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -173,7 +173,7 @@ type CDCSyncConnectorCore interface { // ReplayTableSchemaDelta changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. // Connectors which are non-normalizing should implement this as a nop. - ReplayTableSchemaDeltas(ctx context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error + ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error } type CDCSyncConnector interface { @@ -463,8 +463,6 @@ var ( _ CDCSyncConnector = &connclickhouse.ClickHouseConnector{} _ CDCSyncConnector = &connelasticsearch.ElasticsearchConnector{} - _ CDCSyncPgConnector = &connpostgres.PostgresConnector{} - _ CDCNormalizeConnector = &connpostgres.PostgresConnector{} _ CDCNormalizeConnector = &connbigquery.BigQueryConnector{} _ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{} diff --git a/flow/connectors/elasticsearch/elasticsearch.go b/flow/connectors/elasticsearch/elasticsearch.go index e675168051..30279fd74e 100644 --- a/flow/connectors/elasticsearch/elasticsearch.go +++ b/flow/connectors/elasticsearch/elasticsearch.go @@ -95,7 +95,7 @@ func (esc *ElasticsearchConnector) CreateRawTable(ctx context.Context, } // we handle schema changes by not handling them since no mapping is being enforced right now -func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context, +func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta, ) error { return nil diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 01982bf713..0f175233ef 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -380,7 +380,9 @@ func (c *EventHubConnector) CreateRawTable(ctx context.Context, req *protos.Crea }, nil } -func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error { +func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, + flowJobName string, schemaDeltas []*protos.TableSchemaDelta, +) error { c.logger.Info("ReplayTableSchemaDeltas for event hub is a no-op") return nil } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 3e134968dc..5515eae880 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -14,8 +14,8 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub" cmap "github.com/orcaman/concurrent-map/v2" - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -186,10 +186,10 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE func (m *EventHubManager) getEventHubMgmtClient(subID string) (*armeventhub.EventHubsClient, error) { if subID == "" { - envSubID, err := utils.GetAzureSubscriptionID() - if err != nil { - slog.Error("failed to get azure subscription id", slog.Any("error", err)) - return nil, err + envSubID := peerdbenv.GetEnvString("AZURE_SUBSCRIPTION_ID", "") + if envSubID == "" { + slog.Error("couldn't find AZURE_SUBSCRIPTION_ID in environment") + return nil, errors.New("couldn't find AZURE_SUBSCRIPTION_ID in environment") } subID = envSubID } diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index ea0805b84b..ee78093fe6 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -149,7 +149,9 @@ func (c *KafkaConnector) SetupMetadataTables(_ context.Context) error { return nil } -func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error { +func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, + flowJobName string, schemaDeltas []*protos.TableSchemaDelta, +) error { return nil } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 14b827cc89..8f49545fff 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -592,7 +592,7 @@ func syncRecordsCore[Items model.Items]( return nil, err } - err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) + err = c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -941,6 +941,7 @@ func (c *PostgresConnector) SetupNormalizedTable( // This could involve adding or dropping multiple columns. func (c *PostgresConnector) ReplayTableSchemaDeltas( ctx context.Context, + _ map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta, ) error { diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 946b20eb3e..0b6668a5a2 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -58,7 +58,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) require.NoError(s.t, err) - err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: []*protos.FieldDescription{ @@ -113,7 +113,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { } } - err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, @@ -144,7 +144,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { } } - err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, @@ -175,7 +175,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { } } - err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index 0249b75fc1..a81df27698 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "math/rand/v2" "testing" "time" @@ -84,11 +85,8 @@ func TestGetQRepPartitions(t *testing.T) { } defer conn.Close(context.Background()) - // Generate a random schema name - rndUint, err := shared.RandomUInt64() - if err != nil { - t.Fatalf("Failed to generate random uint: %v", err) - } + //nolint:gosec // Generate a random schema name, number has no cryptographic significance + rndUint := rand.Uint64() schemaName := fmt.Sprintf("test_%d", rndUint) // Create the schema diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index 89dab6a94f..21a39627be 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "math/rand/v2" "github.com/jackc/pgx/v5" @@ -35,20 +36,15 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( } } - randomUint, err := shared.RandomUInt64() - if err != nil { - qe.logger.Error("[pg_query_executor] failed to generate random uint", slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err) - stream.Close(err) - return 0, err - } + //nolint:gosec // number has no cryptographic significance + randomUint := rand.Uint64() cursorName := fmt.Sprintf("peerdb_cursor_%d", randomUint) fetchSize := shared.FetchAndChannelSize cursorQuery := fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursorName, query) qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args)) - _, err = tx.Exec(ctx, cursorQuery, args...) - if err != nil { + + if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil { qe.logger.Info("[pg_query_executor] failed to declare cursor", slog.String("cursorQuery", cursorQuery), slog.Any("error", err)) err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err) diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 49aed379c4..537cda7241 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -67,7 +67,9 @@ func (c *PubSubConnector) CreateRawTable(ctx context.Context, req *protos.Create return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil } -func (c *PubSubConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error { +func (c *PubSubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, + flowJobName string, schemaDeltas []*protos.TableSchemaDelta, +) error { return nil } diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 9fbb485ab8..968c956aab 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -20,7 +20,7 @@ func (c *S3Connector) SyncQRepRecords( schema := stream.Schema() dstTableName := config.DestinationTableIdentifier - avroSchema, err := getAvroSchema(dstTableName, schema) + avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema) if err != nil { return 0, err } @@ -34,10 +34,12 @@ func (c *S3Connector) SyncQRepRecords( } func getAvroSchema( + ctx context.Context, + env map[string]string, dstTableName string, schema qvalue.QRecordSchema, ) (*model.QRecordAvroSchemaDefinition, error) { - avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_S3) + avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_S3) if err != nil { return nil, fmt.Errorf("failed to define Avro schema: %w", err) } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index eac37cd7c8..7d16a20af0 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -118,7 +118,9 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq }, nil } -func (c *S3Connector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error { +func (c *S3Connector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, + flowJobName string, schemaDeltas []*protos.TableSchemaDelta, +) error { c.logger.Info("ReplayTableSchemaDeltas for S3 is a no-op") return nil } diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index ac6f253517..4a76fccd01 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -144,14 +144,14 @@ func TestWriteRecordsToAvroFileHappyPath(t *testing.T) { // Define sample data records, schema := generateRecords(t, true, 10, false) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, protos.DBType_SNOWFLAKE) + avroSchema, err := model.GetAvroSchemaDefinition(context.Background(), nil, "not_applicable", schema, protos.DBType_SNOWFLAKE) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(records, avroSchema, avro.CompressNone, protos.DBType_SNOWFLAKE) - _, err = writer.WriteRecordsToAvroFile(context.Background(), tmpfile.Name()) + _, err = writer.WriteRecordsToAvroFile(context.Background(), nil, tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") // Check file is not empty @@ -171,14 +171,14 @@ func TestWriteRecordsToZstdAvroFileHappyPath(t *testing.T) { // Define sample data records, schema := generateRecords(t, true, 10, false) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, protos.DBType_SNOWFLAKE) + avroSchema, err := model.GetAvroSchemaDefinition(context.Background(), nil, "not_applicable", schema, protos.DBType_SNOWFLAKE) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(records, avroSchema, avro.CompressZstd, protos.DBType_SNOWFLAKE) - _, err = writer.WriteRecordsToAvroFile(context.Background(), tmpfile.Name()) + _, err = writer.WriteRecordsToAvroFile(context.Background(), nil, tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") // Check file is not empty @@ -198,14 +198,14 @@ func TestWriteRecordsToDeflateAvroFileHappyPath(t *testing.T) { // Define sample data records, schema := generateRecords(t, true, 10, false) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, protos.DBType_SNOWFLAKE) + avroSchema, err := model.GetAvroSchemaDefinition(context.Background(), nil, "not_applicable", schema, protos.DBType_SNOWFLAKE) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(records, avroSchema, avro.CompressDeflate, protos.DBType_SNOWFLAKE) - _, err = writer.WriteRecordsToAvroFile(context.Background(), tmpfile.Name()) + _, err = writer.WriteRecordsToAvroFile(context.Background(), nil, tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") // Check file is not empty @@ -224,14 +224,14 @@ func TestWriteRecordsToAvroFileNonNull(t *testing.T) { records, schema := generateRecords(t, false, 10, false) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, protos.DBType_SNOWFLAKE) + avroSchema, err := model.GetAvroSchemaDefinition(context.Background(), nil, "not_applicable", schema, protos.DBType_SNOWFLAKE) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(records, avroSchema, avro.CompressNone, protos.DBType_SNOWFLAKE) - _, err = writer.WriteRecordsToAvroFile(context.Background(), tmpfile.Name()) + _, err = writer.WriteRecordsToAvroFile(context.Background(), nil, tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") // Check file is not empty @@ -251,14 +251,14 @@ func TestWriteRecordsToAvroFileAllNulls(t *testing.T) { // Define sample data records, schema := generateRecords(t, true, 10, true) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, protos.DBType_SNOWFLAKE) + avroSchema, err := model.GetAvroSchemaDefinition(context.Background(), nil, "not_applicable", schema, protos.DBType_SNOWFLAKE) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) // Call function writer := avro.NewPeerDBOCFWriter(records, avroSchema, avro.CompressNone, protos.DBType_SNOWFLAKE) - _, err = writer.WriteRecordsToAvroFile(context.Background(), tmpfile.Name()) + _, err = writer.WriteRecordsToAvroFile(context.Background(), nil, tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") // Check file is not empty diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 37b4ed7bdb..d87d3004f7 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -1,6 +1,7 @@ package connsnowflake import ( + "context" "fmt" "strings" @@ -24,7 +25,7 @@ type mergeStmtGenerator struct { mergeBatchId int64 } -func (m *mergeStmtGenerator) generateMergeStmt(dstTable string) (string, error) { +func (m *mergeStmtGenerator) generateMergeStmt(ctx context.Context, env map[string]string, dstTable string) (string, error) { parsedDstTable, _ := utils.ParseSchemaTable(dstTable) normalizedTableSchema := m.tableSchemaMapping[dstTable] unchangedToastColumns := m.unchangedToastColumnsMap[dstTable] @@ -34,7 +35,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(dstTable string) (string, error) for _, column := range columns { genericColumnType := column.Type qvKind := qvalue.QValueKind(genericColumnType) - sfType, err := qvKind.ToDWHColumnType(protos.DBType_SNOWFLAKE) + sfType, err := qvKind.ToDWHColumnType(ctx, env, protos.DBType_SNOWFLAKE, column) if err != nil { return "", fmt.Errorf("failed to convert column type %s to snowflake type: %w", genericColumnType, err) } diff --git a/flow/connectors/snowflake/qrep_avro_consolidate.go b/flow/connectors/snowflake/qrep_avro_consolidate.go index 547aef27ef..a4a8d1a285 100644 --- a/flow/connectors/snowflake/qrep_avro_consolidate.go +++ b/flow/connectors/snowflake/qrep_avro_consolidate.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "log/slog" + "math/rand/v2" "strings" "time" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/peerdbenv" - "github.com/PeerDB-io/peer-flow/shared" ) type SnowflakeAvroConsolidateHandler struct { @@ -214,10 +214,8 @@ func (s *SnowflakeAvroConsolidateHandler) generateUpsertMergeCommand( // handleUpsertMode handles the upsert mode func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context) error { - runID, err := shared.RandomUInt64() - if err != nil { - return fmt.Errorf("failed to generate run ID: %w", err) - } + //nolint:gosec // number has no cryptographic significance + runID := rand.Uint64() tempTableName := fmt.Sprintf("%s_temp_%d", s.dstTableName, runID) @@ -230,8 +228,8 @@ func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context) s.connector.logger.Info("created temp table " + tempTableName) copyCmd := s.getCopyTransformation(tempTableName) - _, err = s.connector.database.ExecContext(ctx, copyCmd) - if err != nil { + + if _, err := s.connector.database.ExecContext(ctx, copyCmd); err != nil { return fmt.Errorf("failed to run COPY INTO command: %w", err) } s.connector.logger.Info("copied file from stage " + s.stage + " to temp table " + tempTableName) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 0fea54b027..728d393e62 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -48,7 +48,7 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( s.logger.Info("sync function called and schema acquired", tableLog) - avroSchema, err := s.getAvroSchema(dstTableName, schema) + avroSchema, err := s.getAvroSchema(ctx, env, dstTableName, schema) if err != nil { return 0, err } @@ -98,12 +98,12 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( schema := stream.Schema() s.logger.Info("sync function called and schema acquired", partitionLog) - err := s.addMissingColumns(ctx, schema, dstTableSchema, dstTableName, partition) + err := s.addMissingColumns(ctx, config.Env, schema, dstTableSchema, dstTableName, partition) if err != nil { return 0, err } - avroSchema, err := s.getAvroSchema(dstTableName, schema) + avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, schema) if err != nil { return 0, err } @@ -130,6 +130,7 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( func (s *SnowflakeAvroSyncHandler) addMissingColumns( ctx context.Context, + env map[string]string, schema qvalue.QRecordSchema, dstTableSchema []*sql.ColumnType, dstTableName string, @@ -138,7 +139,7 @@ func (s *SnowflakeAvroSyncHandler) addMissingColumns( partitionLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) // check if avro schema has additional columns compared to destination table // if so, we need to add those columns to the destination table - colsToTypes := map[string]qvalue.QValueKind{} + var newColumns []qvalue.QField for _, col := range schema.Fields { hasColumn := false // check ignoring case @@ -152,24 +153,23 @@ func (s *SnowflakeAvroSyncHandler) addMissingColumns( if !hasColumn { s.logger.Info(fmt.Sprintf("adding column %s to destination table %s", col.Name, dstTableName), partitionLog) - colsToTypes[col.Name] = col.Type + newColumns = append(newColumns, col) } } - if len(colsToTypes) > 0 { + if len(newColumns) > 0 { tx, err := s.database.Begin() if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } - for colName, colType := range colsToTypes { - sfColType, err := colType.ToDWHColumnType(protos.DBType_SNOWFLAKE) + for _, column := range newColumns { + sfColType, err := column.ToDWHColumnType(ctx, env, protos.DBType_SNOWFLAKE) if err != nil { return fmt.Errorf("failed to convert QValueKind to Snowflake column type: %w", err) } - upperCasedColName := strings.ToUpper(colName) - alterTableCmd := fmt.Sprintf("ALTER TABLE %s ", dstTableName) - alterTableCmd += fmt.Sprintf("ADD COLUMN IF NOT EXISTS \"%s\" %s;", upperCasedColName, sfColType) + upperCasedColName := strings.ToUpper(column.Name) + alterTableCmd := fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s;", dstTableName, upperCasedColName, sfColType) s.logger.Info(fmt.Sprintf("altering destination table %s with command `%s`", dstTableName, alterTableCmd), partitionLog) @@ -193,10 +193,12 @@ func (s *SnowflakeAvroSyncHandler) addMissingColumns( } func (s *SnowflakeAvroSyncHandler) getAvroSchema( + ctx context.Context, + env map[string]string, dstTableName string, schema qvalue.QRecordSchema, ) (*model.QRecordAvroSchemaDefinition, error) { - avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_SNOWFLAKE) + avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_SNOWFLAKE) if err != nil { return nil, fmt.Errorf("failed to define Avro schema: %w", err) } @@ -223,7 +225,7 @@ func (s *SnowflakeAvroSyncHandler) writeToAvroFile( localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID) s.logger.Info("writing records to local file " + localFilePath) - avroFile, err := ocfWriter.WriteRecordsToAvroFile(ctx, localFilePath) + avroFile, err := ocfWriter.WriteRecordsToAvroFile(ctx, env, localFilePath) if err != nil { return nil, fmt.Errorf("failed to write records to Avro file: %w", err) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 06e3fb881e..124560e90e 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -19,7 +19,6 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" - numeric "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -246,6 +245,7 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch( ctx context.Context, flowJobName string, batchId int64, + tableToSchema map[string]*protos.TableSchema, ) ([]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) @@ -262,7 +262,11 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch( if err := rows.Scan(&result); err != nil { return nil, fmt.Errorf("failed to read row: %w", err) } - destinationTableNames = append(destinationTableNames, result.String) + if _, ok := tableToSchema[result.String]; ok { + destinationTableNames = append(destinationTableNames, result.String) + } else { + c.logger.Warn("table not found in table to schema mapping", "table", result.String) + } } if err := rows.Err(); err != nil { @@ -338,7 +342,7 @@ func (c *SnowflakeConnector) SetupNormalizedTable( return true, nil } - normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable(config, normalizedSchemaTable, tableSchema) + normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable(ctx, config, normalizedSchemaTable, tableSchema) if _, err := c.execWithLogging(ctx, normalizedTableCreateSQL); err != nil { return false, fmt.Errorf("[sf] error while creating normalized table: %w", err) } @@ -349,6 +353,7 @@ func (c *SnowflakeConnector) SetupNormalizedTable( // This could involve adding or dropping multiple columns. func (c *SnowflakeConnector) ReplayTableSchemaDeltas( ctx context.Context, + env map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta, ) error { @@ -374,17 +379,12 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( } for _, addedColumn := range schemaDelta.AddedColumns { - sfColtype, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(protos.DBType_SNOWFLAKE) + sfColtype, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(ctx, env, protos.DBType_SNOWFLAKE, addedColumn) if err != nil { return fmt.Errorf("failed to convert column type %s to snowflake type: %w", addedColumn.Type, err) } - if addedColumn.Type == string(qvalue.QValueKindNumeric) { - precision, scale := numeric.GetNumericTypeForWarehouse(addedColumn.TypeModifier, numeric.SnowflakeNumericCompatibility{}) - sfColtype = fmt.Sprintf("NUMERIC(%d,%d)", precision, scale) - } - _, err = tableSchemaModifyTx.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s", schemaDelta.DstTableName, strings.ToUpper(addedColumn.Name), sfColtype)) @@ -460,7 +460,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( return nil, err } - err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) + err = c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -525,7 +525,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( tableToSchema map[string]*protos.TableSchema, peerdbCols *protos.PeerDBColumns, ) error { - destinationTableNames, err := c.getDistinctTableNamesInBatch(ctx, flowName, batchId) + destinationTableNames, err := c.getDistinctTableNamesInBatch(ctx, flowName, batchId, tableToSchema) if err != nil { return err } @@ -557,7 +557,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( } g.Go(func() error { - mergeStatement, err := mergeGen.generateMergeStmt(tableName) + mergeStatement, err := mergeGen.generateMergeStmt(gCtx, env, tableName) if err != nil { return err } @@ -666,6 +666,7 @@ func (c *SnowflakeConnector) checkIfTableExists( } func generateCreateTableSQLForNormalizedTable( + ctx context.Context, config *protos.SetupNormalizedTableBatchInput, dstSchemaTable *utils.SchemaTable, tableSchema *protos.TableSchema, @@ -674,18 +675,13 @@ func generateCreateTableSQLForNormalizedTable( for _, column := range tableSchema.Columns { genericColumnType := column.Type normalizedColName := SnowflakeIdentifierNormalize(column.Name) - sfColType, err := qvalue.QValueKind(genericColumnType).ToDWHColumnType(protos.DBType_SNOWFLAKE) + sfColType, err := qvalue.QValueKind(genericColumnType).ToDWHColumnType(ctx, config.Env, protos.DBType_SNOWFLAKE, column) if err != nil { slog.Warn(fmt.Sprintf("failed to convert column type %s to snowflake type", genericColumnType), slog.Any("error", err)) continue } - if genericColumnType == "numeric" { - precision, scale := numeric.GetNumericTypeForWarehouse(column.TypeModifier, numeric.SnowflakeNumericCompatibility{}) - sfColType = fmt.Sprintf("NUMERIC(%d,%d)", precision, scale) - } - var notNull string if tableSchema.NullableEnabled && !column.Nullable { notNull = " NOT NULL" diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index ee72e2c28b..75bc9f4358 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -127,16 +127,21 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error return ocfWriter, nil } -func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter *goavro.OCFWriter) (int64, error) { +func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[string]string, ocfWriter *goavro.OCFWriter) (int64, error) { logger := shared.LoggerFromCtx(ctx) schema := p.stream.Schema() - avroConverter := model.NewQRecordAvroConverter( + avroConverter, err := model.NewQRecordAvroConverter( + ctx, + env, p.avroSchema, p.targetDWH, schema.GetColumnNames(), logger, ) + if err != nil { + return 0, err + } numRows := atomic.Int64{} @@ -147,7 +152,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter for qrecord := range p.stream.Records { if err := ctx.Err(); err != nil { - return numRows.Load(), ctx.Err() + return numRows.Load(), err } else { avroMap, err := avroConverter.Convert(qrecord) if err != nil { @@ -172,7 +177,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter return numRows.Load(), nil } -func (p *peerDBOCFWriter) WriteOCF(ctx context.Context, w io.Writer) (int, error) { +func (p *peerDBOCFWriter) WriteOCF(ctx context.Context, env map[string]string, w io.Writer) (int, error) { ocfWriter, err := p.createOCFWriter(w) if err != nil { return 0, fmt.Errorf("failed to create OCF writer: %w", err) @@ -180,7 +185,7 @@ func (p *peerDBOCFWriter) WriteOCF(ctx context.Context, w io.Writer) (int, error // we have to keep a reference to the underlying writer as goavro doesn't provide any access to it defer p.writer.Close() - numRows, err := p.writeRecordsToOCFWriter(ctx, ocfWriter) + numRows, err := p.writeRecordsToOCFWriter(ctx, env, ocfWriter) if err != nil { return 0, fmt.Errorf("failed to write records to OCF writer: %w", err) } @@ -217,7 +222,7 @@ func (p *peerDBOCFWriter) WriteRecordsToS3( } w.Close() }() - numRows, writeOcfError = p.WriteOCF(ctx, w) + numRows, writeOcfError = p.WriteOCF(ctx, env, w) }() partSize, err := peerdbenv.PeerDBS3PartSize(ctx, env) @@ -254,7 +259,7 @@ func (p *peerDBOCFWriter) WriteRecordsToS3( }, nil } -func (p *peerDBOCFWriter) WriteRecordsToAvroFile(ctx context.Context, filePath string) (*AvroFile, error) { +func (p *peerDBOCFWriter) WriteRecordsToAvroFile(ctx context.Context, env map[string]string, filePath string) (*AvroFile, error) { file, err := os.Create(filePath) if err != nil { return nil, fmt.Errorf("failed to create temporary Avro file: %w", err) @@ -275,7 +280,7 @@ func (p *peerDBOCFWriter) WriteRecordsToAvroFile(ctx context.Context, filePath s bufferedWriter := bufio.NewWriterSize(file, buffSizeBytes) defer bufferedWriter.Flush() - numRecords, err := p.WriteOCF(ctx, bufferedWriter) + numRecords, err := p.WriteOCF(ctx, env, bufferedWriter) if err != nil { return nil, fmt.Errorf("failed to write records to temporary Avro file: %w", err) } diff --git a/flow/connectors/utils/azure.go b/flow/connectors/utils/azure.go deleted file mode 100644 index df612b47d3..0000000000 --- a/flow/connectors/utils/azure.go +++ /dev/null @@ -1,15 +0,0 @@ -package utils - -import ( - "errors" - "os" -) - -func GetAzureSubscriptionID() (string, error) { - // get this from env - id := os.Getenv("AZURE_SUBSCRIPTION_ID") - if id == "" { - return "", errors.New("AZURE_SUBSCRIPTION_ID is not set") - } - return id, nil -} diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 56c1b17839..8b942e4f67 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -90,6 +90,10 @@ func MakeNumericTypmod(precision int32, scale int32) int32 { // This is to reverse what make_numeric_typmod of Postgres does: // https://github.com/postgres/postgres/blob/21912e3c0262e2cfe64856e028799d6927862563/src/backend/utils/adt/numeric.c#L897 func ParseNumericTypmod(typmod int32) (int16, int16) { + if typmod == -1 { + return 0, 0 + } + offsetMod := typmod - VARHDRSZ precision := int16((offsetMod >> 16) & 0x7FFF) scale := int16(offsetMod & 0x7FFF) @@ -102,6 +106,14 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC } precision, scale := ParseNumericTypmod(typmod) + return GetNumericTypeForWarehousePrecisionScale(precision, scale, warehouseNumeric) +} + +func GetNumericTypeForWarehousePrecisionScale(precision int16, scale int16, warehouseNumeric WarehouseNumericCompatibility) (int16, int16) { + if precision == 0 && scale == 0 { + return warehouseNumeric.DefaultPrecisionAndScale() + } + if !IsValidPrecision(precision, warehouseNumeric) { precision = warehouseNumeric.MaxPrecision() } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 1ee303acf8..ee33f2bfc3 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/big" + "math/rand/v2" "os" "strings" "testing" @@ -21,7 +22,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type BigQueryTestHelper struct { @@ -37,10 +37,8 @@ type BigQueryTestHelper struct { func NewBigQueryTestHelper(t *testing.T) (*BigQueryTestHelper, error) { t.Helper() // random 64 bit int to namespace stateful schemas. - runID, err := shared.RandomUInt64() - if err != nil { - return nil, fmt.Errorf("failed to generate random uint64: %w", err) - } + //nolint:gosec // number has no cryptographic significance + runID := rand.Uint64() jsonPath := os.Getenv("TEST_BQ_CREDS") if jsonPath == "" { diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 9c4fa2a167..a19e69c8c7 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -4,6 +4,7 @@ import ( "context" "embed" "fmt" + "strconv" "strings" "testing" "time" @@ -11,7 +12,7 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/require" - "github.com/PeerDB-io/peer-flow/connectors/clickhouse" + connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -557,8 +558,8 @@ func (s ClickHouseSuite) Test_Large_Numeric() { `, srcFullName)) require.NoError(s.t, err) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78))) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c1,c2) VALUES($1,$2)", srcFullName), + strings.Repeat("7", 76), strings.Repeat("9", 78)) require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ @@ -568,14 +569,15 @@ func (s ClickHouseSuite) Test_Large_Numeric() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) flowConnConfig.DoInitialSnapshot = true + tc := e2e.NewTemporalClient(s.t) env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 1) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78))) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c1,c2) VALUES($1,$2)", srcFullName), + strings.Repeat("7", 76), strings.Repeat("9", 78)) require.NoError(s.t, err) e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 2) @@ -598,3 +600,67 @@ func (s ClickHouseSuite) Test_Large_Numeric() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +// Unbounded NUMERICs (no precision, scale specified) are mapped to String on CH if FF enabled, Decimal if not +func (s ClickHouseSuite) testNumericFF(ffValue bool) { + nines := strings.Repeat("9", 38) + dstTableName := fmt.Sprintf("unumeric_ff_%v", ffValue) + srcFullName := s.attachSchemaSuffix(dstTableName) + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + c numeric + ); + `, srcFullName)) + require.NoError(s.t, err) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c) VALUES($1)", srcFullName), nines) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(fmt.Sprintf("clickhouse_test_unbounded_numerics_ff_%v", ffValue)), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.Env = map[string]string{"PEERDB_CLICKHOUSE_UNBOUNDED_NUMERIC_AS_STRING": strconv.FormatBool(ffValue)} + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c", 1) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c) VALUES($1)", srcFullName), nines) + require.NoError(s.t, err) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c", 2) + + rows, err := s.GetRows(dstTableName, "c") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2, "expected 2 rows") + for _, row := range rows.Records { + require.Len(s.t, row, 1, "expected 1 column") + if ffValue { + c, ok := row[0].Value().(string) + require.True(s.t, ok, "expected unbounded NUMERIC to be String") + require.Equal(s.t, nines, c, "expected unbounded NUMERIC to be 9s") + } else { + c, ok := row[0].Value().(decimal.Decimal) + require.True(s.t, ok, "expected unbounded NUMERIC to be Decimal") + require.Equal(s.t, nines, c.String(), "expected unbounded NUMERIC to be 9s") + } + } + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} + +func (s ClickHouseSuite) Test_Unbounded_Numeric_With_FF() { + s.testNumericFF(true) +} + +func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() { + s.testNumericFF(false) +} diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index ca57b5b473..7e2943e3bc 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math/rand/v2" "os" "testing" @@ -13,7 +14,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type SnowflakeTestHelper struct { @@ -47,11 +47,8 @@ func NewSnowflakeTestHelper(t *testing.T) (*SnowflakeTestHelper, error) { return nil, fmt.Errorf("failed to unmarshal json: %w", err) } - runID, err := shared.RandomUInt64() - if err != nil { - return nil, fmt.Errorf("failed to generate random uint64: %w", err) - } - + //nolint:gosec // number has no cryptographic significance + runID := rand.Uint64() testDatabaseName := fmt.Sprintf("e2e_test_%d", runID) adminClient, err := connsnowflake.NewSnowflakeClient(context.Background(), config) diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index 32cb03b644..ada2b10f6a 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -53,7 +53,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) require.NoError(s.t, err) - err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: []*protos.FieldDescription{ @@ -167,7 +167,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { } } - err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, @@ -246,7 +246,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { } } - err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, @@ -301,7 +301,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { } } - err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, diff --git a/flow/e2e/sqlserver/sqlserver_helper.go b/flow/e2e/sqlserver/sqlserver_helper.go index 056922800c..d3e1401f24 100644 --- a/flow/e2e/sqlserver/sqlserver_helper.go +++ b/flow/e2e/sqlserver/sqlserver_helper.go @@ -3,6 +3,7 @@ package e2e_sqlserver import ( "context" "fmt" + "math/rand/v2" "os" "strconv" @@ -10,7 +11,6 @@ import ( connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type SQLServerHelper struct { @@ -45,11 +45,8 @@ func NewSQLServerHelper() (*SQLServerHelper, error) { return nil, fmt.Errorf("invalid connection configs: %v", connErr) } - rndNum, err := shared.RandomUInt64() - if err != nil { - return nil, err - } - + //nolint:gosec // number has no cryptographic significance + rndNum := rand.Uint64() testSchema := fmt.Sprintf("e2e_test_%d", rndNum) if err := connector.CreateSchema(context.Background(), testSchema); err != nil { return nil, err diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index 8f52c44611..ec7cfc6e37 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -1,6 +1,7 @@ package model import ( + "context" "encoding/json" "fmt" @@ -8,38 +9,52 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) type QRecordAvroConverter struct { - logger log.Logger - Schema *QRecordAvroSchemaDefinition - ColNames []string - TargetDWH protos.DBType + logger log.Logger + Schema *QRecordAvroSchemaDefinition + ColNames []string + TargetDWH protos.DBType + UnboundedNumericAsString bool } func NewQRecordAvroConverter( + ctx context.Context, + env map[string]string, schema *QRecordAvroSchemaDefinition, targetDWH protos.DBType, colNames []string, logger log.Logger, -) *QRecordAvroConverter { - return &QRecordAvroConverter{ - Schema: schema, - TargetDWH: targetDWH, - ColNames: colNames, - logger: logger, +) (*QRecordAvroConverter, error) { + var unboundedNumericAsString bool + if targetDWH == protos.DBType_CLICKHOUSE { + var err error + unboundedNumericAsString, err = peerdbenv.PeerDBEnableClickHouseNumericAsString(ctx, env) + if err != nil { + return nil, err + } } -} -func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]interface{}, error) { - m := make(map[string]interface{}, len(qrecord)) + return &QRecordAvroConverter{ + Schema: schema, + TargetDWH: targetDWH, + ColNames: colNames, + logger: logger, + UnboundedNumericAsString: unboundedNumericAsString, + }, nil +} +func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]any, error) { + m := make(map[string]any, len(qrecord)) for idx, val := range qrecord { avroVal, err := qvalue.QValueToAvro( val, &qac.Schema.Fields[idx], qac.TargetDWH, qac.logger, + qac.UnboundedNumericAsString, ) if err != nil { return nil, fmt.Errorf("failed to convert QValue to Avro-compatible value: %w", err) @@ -52,8 +67,8 @@ func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]in } type QRecordAvroField struct { - Type interface{} `json:"type"` - Name string `json:"name"` + Type any `json:"type"` + Name string `json:"name"` } type QRecordAvroSchema struct { @@ -68,6 +83,8 @@ type QRecordAvroSchemaDefinition struct { } func GetAvroSchemaDefinition( + ctx context.Context, + env map[string]string, dstTableName string, qRecordSchema qvalue.QRecordSchema, targetDWH protos.DBType, @@ -75,7 +92,7 @@ func GetAvroSchemaDefinition( avroFields := make([]QRecordAvroField, 0, len(qRecordSchema.Fields)) for _, qField := range qRecordSchema.Fields { - avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, targetDWH, qField.Precision, qField.Scale) + avroType, err := qvalue.GetAvroSchemaFromQValueKind(ctx, env, qField.Type, targetDWH, qField.Precision, qField.Scale) if err != nil { return nil, err } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 97d9641b6b..db5bf4e2af 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -1,6 +1,7 @@ package qvalue import ( + "context" "encoding/base64" "errors" "fmt" @@ -14,6 +15,7 @@ import ( "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) type AvroSchemaField struct { @@ -74,7 +76,14 @@ func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, tar // // For example, QValueKindInt64 would return an AvroLogicalSchema of "long". Unsupported QValueKinds // will return an error. -func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, precision int16, scale int16) (interface{}, error) { +func GetAvroSchemaFromQValueKind( + ctx context.Context, + env map[string]string, + kind QValueKind, + targetDWH protos.DBType, + precision int16, + scale int16, +) (interface{}, error) { switch kind { case QValueKindString: return "string", nil @@ -103,9 +112,19 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci } return "bytes", nil case QValueKindNumeric: - if targetDWH == protos.DBType_CLICKHOUSE && - precision > datatypes.PeerDBClickHouseMaxPrecision { - return "string", nil + if targetDWH == protos.DBType_CLICKHOUSE { + if precision == 0 && scale == 0 { + asString, err := peerdbenv.PeerDBEnableClickHouseNumericAsString(ctx, env) + if err != nil { + return nil, err + } + if asString { + return "string", nil + } + } + if precision > datatypes.PeerDBClickHouseMaxPrecision { + return "string", nil + } } avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH) return AvroSchemaNumeric{ @@ -212,19 +231,24 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci type QValueAvroConverter struct { *QField - logger log.Logger - TargetDWH protos.DBType + logger log.Logger + TargetDWH protos.DBType + UnboundedNumericAsString bool } -func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger log.Logger) (interface{}, error) { +func QValueToAvro( + value QValue, field *QField, targetDWH protos.DBType, logger log.Logger, + unboundedNumericAsString bool, +) (any, error) { if value.Value() == nil { return nil, nil } - c := &QValueAvroConverter{ - QField: field, - TargetDWH: targetDWH, - logger: logger, + c := QValueAvroConverter{ + QField: field, + TargetDWH: targetDWH, + logger: logger, + UnboundedNumericAsString: unboundedNumericAsString, } switch v := value.(type) { @@ -456,18 +480,18 @@ func (c *QValueAvroConverter) processNullableUnion( return value, nil } -func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { +func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any { + if (c.UnboundedNumericAsString && c.Precision == 0 && c.Scale == 0) || + (c.TargetDWH == protos.DBType_CLICKHOUSE && c.Precision > datatypes.PeerDBClickHouseMaxPrecision) { + numStr, _ := c.processNullableUnion("string", num.String()) + return numStr + } + num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH) if err != nil { return nil } - if c.TargetDWH == protos.DBType_CLICKHOUSE && - c.Precision > datatypes.PeerDBClickHouseMaxPrecision { - // no error returned - numStr, _ := c.processNullableUnion("string", num.String()) - return numStr - } rat := num.Rat() if c.Nullable { return goavro.Union("bytes.decimal", rat) diff --git a/flow/model/qvalue/dwh.go b/flow/model/qvalue/dwh.go index 49c359b885..b2d085acb4 100644 --- a/flow/model/qvalue/dwh.go +++ b/flow/model/qvalue/dwh.go @@ -5,24 +5,24 @@ import ( "go.temporal.io/sdk/log" - numeric "github.com/PeerDB-io/peer-flow/datatypes" + "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" ) func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBType) (int16, int16) { - var warehouseNumeric numeric.WarehouseNumericCompatibility + var warehouseNumeric datatypes.WarehouseNumericCompatibility switch dwh { case protos.DBType_CLICKHOUSE: - warehouseNumeric = numeric.ClickHouseNumericCompatibility{} + warehouseNumeric = datatypes.ClickHouseNumericCompatibility{} case protos.DBType_SNOWFLAKE: - warehouseNumeric = numeric.SnowflakeNumericCompatibility{} + warehouseNumeric = datatypes.SnowflakeNumericCompatibility{} case protos.DBType_BIGQUERY: - warehouseNumeric = numeric.BigQueryNumericCompatibility{} + warehouseNumeric = datatypes.BigQueryNumericCompatibility{} default: - warehouseNumeric = numeric.DefaultNumericCompatibility{} + warehouseNumeric = datatypes.DefaultNumericCompatibility{} } - return numeric.GetNumericTypeForWarehouse(numeric.MakeNumericTypmod(int32(precision), int32(scale)), warehouseNumeric) + return datatypes.GetNumericTypeForWarehousePrecisionScale(precision, scale, warehouseNumeric) } // Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 91ab867a0e..3cffcc274a 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -1,10 +1,13 @@ package qvalue import ( + "context" "fmt" "strings" + "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) type QValueKind string @@ -68,7 +71,6 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindInt64: "INTEGER", QValueKindFloat32: "FLOAT", QValueKindFloat64: "FLOAT", - QValueKindNumeric: "NUMBER(38, 9)", QValueKindQChar: "CHAR", QValueKindString: "STRING", QValueKindJSON: "VARIANT", @@ -110,7 +112,6 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindInt64: "Int64", QValueKindFloat32: "Float32", QValueKindFloat64: "Float64", - QValueKindNumeric: "Decimal128(9)", QValueKindQChar: "FixedString(1)", QValueKindString: "String", QValueKindJSON: "String", @@ -140,16 +141,39 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindArrayJSONB: "String", } -func (kind QValueKind) ToDWHColumnType(dwhType protos.DBType) (string, error) { +func getClickHouseTypeForNumericColumn(ctx context.Context, env map[string]string, column *protos.FieldDescription) (string, error) { + if column.TypeModifier == -1 { + numericAsStringEnabled, err := peerdbenv.PeerDBEnableClickHouseNumericAsString(ctx, env) + if err != nil { + return "", err + } + if numericAsStringEnabled { + return "String", nil + } + } else if rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier); rawPrecision > datatypes.PeerDBClickHouseMaxPrecision { + return "String", nil + } + precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) + return fmt.Sprintf("Decimal(%d, %d)", precision, scale), nil +} + +// SEE ALSO: QField ToDWHColumnType +func (kind QValueKind) ToDWHColumnType(ctx context.Context, env map[string]string, dwhType protos.DBType, column *protos.FieldDescription, +) (string, error) { switch dwhType { case protos.DBType_SNOWFLAKE: - if val, ok := QValueKindToSnowflakeTypeMap[kind]; ok { + if kind == QValueKindNumeric { + precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.SnowflakeNumericCompatibility{}) + return fmt.Sprintf("NUMERIC(%d,%d)", precision, scale), nil + } else if val, ok := QValueKindToSnowflakeTypeMap[kind]; ok { return val, nil } else { return "STRING", nil } case protos.DBType_CLICKHOUSE: - if val, ok := QValueKindToClickHouseTypeMap[kind]; ok { + if kind == QValueKindNumeric { + return getClickHouseTypeForNumericColumn(ctx, env, column) + } else if val, ok := QValueKindToClickHouseTypeMap[kind]; ok { return val, nil } else { return "String", nil diff --git a/flow/model/qvalue/qschema.go b/flow/model/qvalue/qschema.go index a956968ac1..a6632fdf5f 100644 --- a/flow/model/qvalue/qschema.go +++ b/flow/model/qvalue/qschema.go @@ -1,7 +1,13 @@ package qvalue import ( + "context" + "fmt" "strings" + + "github.com/PeerDB-io/peer-flow/datatypes" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) type QField struct { @@ -47,3 +53,42 @@ func (q QRecordSchema) GetColumnNames() []string { } return names } + +func (q QField) getClickHouseTypeForNumericField(ctx context.Context, env map[string]string) (string, error) { + if q.Precision == 0 && q.Scale == 0 { + numericAsStringEnabled, err := peerdbenv.PeerDBEnableClickHouseNumericAsString(ctx, env) + if err != nil { + return "", err + } + if numericAsStringEnabled { + return "String", nil + } + } else if q.Precision > datatypes.PeerDBClickHouseMaxPrecision { + return "String", nil + } + return fmt.Sprintf("Decimal(%d, %d)", q.Precision, q.Scale), nil +} + +// SEE ALSO: qvalue/kind.go ToDWHColumnType +func (q QField) ToDWHColumnType(ctx context.Context, env map[string]string, dwhType protos.DBType) (string, error) { + switch dwhType { + case protos.DBType_SNOWFLAKE: + if val, ok := QValueKindToSnowflakeTypeMap[q.Type]; ok { + return val, nil + } else if q.Type == QValueKindNumeric { + return fmt.Sprintf("NUMERIC(%d,%d)", q.Precision, q.Scale), nil + } else { + return "STRING", nil + } + case protos.DBType_CLICKHOUSE: + if val, ok := QValueKindToClickHouseTypeMap[q.Type]; ok { + return q.getClickHouseTypeForNumericField(ctx, env) + } else if q.Type == QValueKindNumeric { + return val, nil + } else { + return "String", nil + } + default: + return "", fmt.Errorf("unknown dwh type: %v", dwhType) + } +} diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 9aa9d2c5ed..79be60ae40 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) @@ -41,7 +42,7 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { if providedValue > 0 { x = providedValue } else { - x = getEnvInt("", 10) + x = getEnvConvert("", 10, strconv.Atoi) } return time.Duration(x) * time.Second } @@ -133,6 +134,10 @@ func PeerDBAllowedTargets() string { return GetEnvString("PEERDB_ALLOWED_TARGETS", "") } +func PeerDBOnlyClickHouseAllowed() bool { + return strings.EqualFold(PeerDBAllowedTargets(), protos.DBType_CLICKHOUSE.String()) +} + func PeerDBClickHouseAllowedDomains() string { return GetEnvString("PEERDB_CLICKHOUSE_ALLOWED_DOMAINS", "") } @@ -170,5 +175,5 @@ func PeerDBRAPIRequestLoggingEnabled() bool { // PEERDB_MAINTENANCE_MODE_WAIT_ALERT_SECONDS tells how long to wait before alerting that peerdb has been stuck in maintenance mode // for too long func PeerDBMaintenanceModeWaitAlertSeconds() int { - return getEnvInt("PEERDB_MAINTENANCE_MODE_WAIT_ALERT_SECONDS", 600) + return getEnvConvert("PEERDB_MAINTENANCE_MODE_WAIT_ALERT_SECONDS", 600, strconv.Atoi) } diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index b0cbe05f51..98a47d8fdc 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -187,6 +187,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, + { + Name: "PEERDB_CLICKHOUSE_UNBOUNDED_NUMERIC_AS_STRING", + Description: "Map unbounded numerics in Postgres to String in ClickHouse to preserve precision and scale", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, { Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES", Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely", @@ -389,6 +397,10 @@ func PeerDBClickHouseParallelNormalize(ctx context.Context, env map[string]strin return dynamicConfSigned[int](ctx, env, "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE") } +func PeerDBEnableClickHouseNumericAsString(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_UNBOUNDED_NUMERIC_AS_STRING") +} + func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) { return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM") } diff --git a/flow/peerdbenv/env.go b/flow/peerdbenv/env.go index ced2adeab6..14e118e7a5 100644 --- a/flow/peerdbenv/env.go +++ b/flow/peerdbenv/env.go @@ -20,22 +20,6 @@ const ( KmsKeyIDEnvVar = "PEERDB_KMS_KEY_ID" ) -// GetEnvInt returns the value of the environment variable with the given name -// or defaultValue if the environment variable is not set or is not a valid value. -func getEnvInt(name string, defaultValue int) int { - val, ok := os.LookupEnv(name) - if !ok { - return defaultValue - } - - i, err := strconv.Atoi(val) - if err != nil { - return defaultValue - } - - return i -} - // getEnvUint returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set or is not a valid value. func getEnvUint[T constraints.Unsigned](name string, defaultValue T) T { @@ -64,6 +48,19 @@ func GetEnvString(name string, defaultValue string) string { return val } +func getEnvConvert[T any](name string, defaultValue T, convert func(string) (T, error)) T { + val, ok := os.LookupEnv(name) + if !ok { + return defaultValue + } + + ret, err := convert(val) + if err != nil { + return defaultValue + } + return ret +} + func decryptWithKms(ctx context.Context, data []byte) ([]byte, error) { keyID, exists := os.LookupEnv(KmsKeyIDEnvVar) if !exists { diff --git a/flow/shared/random.go b/flow/shared/random.go index 7ef3c8e5dc..84830f3762 100644 --- a/flow/shared/random.go +++ b/flow/shared/random.go @@ -2,32 +2,8 @@ package shared import ( "crypto/rand" - "encoding/binary" - "errors" ) -// RandomInt64 returns a random 64 bit integer. -func RandomInt64() (int64, error) { - b := make([]byte, 8) - _, err := rand.Read(b) - if err != nil { - return 0, errors.New("could not generate random int64: " + err.Error()) - } - // Convert bytes to int64 - return int64(binary.LittleEndian.Uint64(b)), nil -} - -// RandomUInt64 returns a random 64 bit unsigned integer. -func RandomUInt64() (uint64, error) { - b := make([]byte, 8) - _, err := rand.Read(b) - if err != nil { - return 0, errors.New("could not generate random uint64: " + err.Error()) - } - // Convert bytes to uint64 - return binary.LittleEndian.Uint64(b), nil -} - func RandomString(n int) string { const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" bytes := make([]byte, n) diff --git a/flow/shared/worklow.go b/flow/shared/workflow.go similarity index 100% rename from flow/shared/worklow.go rename to flow/shared/workflow.go diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 9b21b7b384..1db3b6d60b 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -208,6 +208,7 @@ func (s *SnapshotFlowExecution) cloneTable( WriteMode: snapshotWriteMode, System: s.config.System, Script: s.config.Script, + Env: s.config.Env, ParentMirrorName: flowName, } diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 15a8c32743..9cd70344bb 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -785,9 +785,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "bytes-utils" @@ -801,9 +801,9 @@ dependencies = [ [[package]] name = "cargo-deb" -version = "2.9.0" +version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9103cb60c68ef7ce14a3d17c6d697e8b180356a447685784f7951074bce0b844" +checksum = "db0e12dd59626cd2543903f1b794135b1f6e0df1003dd3be1071c06961bf6072" dependencies = [ "ar", "cargo_toml", @@ -1500,7 +1500,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "thiserror", + "thiserror 1.0.68", "time", "tokio", "tokio-stream", @@ -1790,7 +1790,7 @@ dependencies = [ "http 1.1.0", "hyper 1.5.0", "hyper-util", - "rustls 0.23.18", + "rustls 0.23.19", "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", @@ -2361,7 +2361,7 @@ dependencies = [ "serde", "serde_json", "socket2", - "thiserror", + "thiserror 1.0.68", "tokio", "tokio-rustls 0.25.0", "tokio-util", @@ -2401,7 +2401,7 @@ dependencies = [ "sha2", "smallvec", "subprocess", - "thiserror", + "thiserror 1.0.68", "uuid", "zstd", ] @@ -2893,9 +2893,9 @@ dependencies = [ [[package]] name = "pgwire" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e0f273b9ffa92a06b0a900c012df432de901c1854b2411cd7b27e2db165cc8" +checksum = "f8e3b217978f9e224cfd5e2b272064067e793a39744030e49657c699752473c8" dependencies = [ "async-trait", "base64 0.22.1", @@ -2911,7 +2911,7 @@ dependencies = [ "ring", "rust_decimal", "stringprep", - "thiserror", + "thiserror 2.0.3", "tokio", "tokio-rustls 0.26.0", "tokio-util", @@ -3044,7 +3044,7 @@ dependencies = [ "anyhow", "futures-util", "pt", - "rustls 0.23.18", + "rustls 0.23.19", "ssh2", "tokio", "tokio-postgres", @@ -3159,7 +3159,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools", "log", "multimap", @@ -3270,9 +3270,9 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.0.0", - "rustls 0.23.18", + "rustls 0.23.19", "socket2", - "thiserror", + "thiserror 1.0.68", "tokio", "tracing", ] @@ -3287,9 +3287,9 @@ dependencies = [ "rand", "ring", "rustc-hash 2.0.0", - "rustls 0.23.18", + "rustls 0.23.19", "slab", - "thiserror", + "thiserror 1.0.68", "tinyvec", "tracing", ] @@ -3412,7 +3412,7 @@ dependencies = [ "log", "regex", "siphasher 1.0.1", - "thiserror", + "thiserror 1.0.68", "time", "tokio", "tokio-postgres", @@ -3518,7 +3518,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.18", + "rustls 0.23.19", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -3686,9 +3686,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.18" +version = "0.23.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" +checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" dependencies = [ "log", "once_cell", @@ -4029,7 +4029,7 @@ checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" dependencies = [ "num-bigint", "num-traits", - "thiserror", + "thiserror 1.0.68", "time", ] @@ -4245,7 +4245,16 @@ version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02dd99dc800bbb97186339685293e1cc5d9df1f8fae2d0aecd9ff1c77efea892" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.68", +] + +[[package]] +name = "thiserror" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +dependencies = [ + "thiserror-impl 2.0.3", ] [[package]] @@ -4259,6 +4268,17 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "thiserror-impl" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "thread_local" version = "1.1.8" @@ -4411,7 +4431,7 @@ checksum = "27d684bad428a0f2481f42241f821db42c54e2dc81d8c00db8536c506b0a0144" dependencies = [ "const-oid", "ring", - "rustls 0.23.18", + "rustls 0.23.19", "tokio", "tokio-postgres", "tokio-rustls 0.26.0", @@ -4445,7 +4465,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.18", + "rustls 0.23.19", "rustls-pki-types", "tokio", ] @@ -4630,9 +4650,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -4646,16 +4666,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" dependencies = [ "crossbeam-channel", - "thiserror", + "thiserror 1.0.68", "time", "tracing-subscriber", ] [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", @@ -4664,9 +4684,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", "valuable", @@ -4685,9 +4705,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.18" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", "nu-ansi-term", @@ -4781,21 +4801,24 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "ureq" -version = "2.10.1" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b74fc6b57825be3373f7054754755f03ac3a8f5d70015ccad699ba2029956f4a" +checksum = "b30e6f97efe1fa43535ee241ee76967d3ff6ff3953ebb430d8d55c5393029e7b" dependencies = [ "base64 0.22.1", "encoding_rs", "flate2", + "litemap", "log", "once_cell", - "rustls 0.23.18", + "rustls 0.23.19", "rustls-pki-types", "serde", "serde_json", "url", "webpki-roots", + "yoke", + "zerofrom", ] [[package]] @@ -5285,9 +5308,9 @@ dependencies = [ [[package]] name = "x509-certificate" -version = "0.23.1" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66534846dec7a11d7c50a74b7cdb208b9a581cad890b7866430d438455847c85" +checksum = "e57b9f8bcae7c1f36479821ae826d75050c60ce55146fd86d3553ed2573e2762" dependencies = [ "bcder", "bytes", @@ -5298,7 +5321,7 @@ dependencies = [ "ring", "signature", "spki", - "thiserror", + "thiserror 1.0.68", "zeroize", ] @@ -5358,7 +5381,7 @@ dependencies = [ "hyper-util", "log", "percent-encoding", - "rustls 0.23.18", + "rustls 0.23.19", "rustls-pemfile 2.2.0", "seahash", "serde", diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 6efea5f4b2..5f5d1b3e6a 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -32,7 +32,7 @@ ssh2 = "0.9" sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" } tokio = { version = "1", features = ["full"] } tracing = "0.1" -pgwire = { version = "0.26", default-features = false, features = [ +pgwire = { version = "0.27", default-features = false, features = [ "scram", "server-api-ring", ] } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 830da627de..cc9309c6b3 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -48,7 +48,7 @@ pub enum QueryAssociation { Catalog, } -impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> { +impl StatementAnalyzer for PeerExistanceAnalyzer<'_> { type Output = QueryAssociation; fn analyze(&self, statement: &Statement) -> anyhow::Result { diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index d5d023e571..4cb2512c40 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -51,7 +51,7 @@ pub struct CatalogConfig<'a> { pub database: &'a str, } -impl<'a> CatalogConfig<'a> { +impl CatalogConfig<'_> { // convert catalog config to PostgresConfig pub fn to_postgres_config(&self) -> pt::peerdb_peers::PostgresConfig { PostgresConfig { diff --git a/nexus/peer-postgres/src/stream.rs b/nexus/peer-postgres/src/stream.rs index 230d2dca7d..6d2f6a6e1d 100644 --- a/nexus/peer-postgres/src/stream.rs +++ b/nexus/peer-postgres/src/stream.rs @@ -186,6 +186,12 @@ fn values_from_row(row: &Row) -> Vec { let uuid: Option = row.get(i); uuid.map(Value::Uuid).unwrap_or(Value::Null) } + &Type::UUID_ARRAY => { + let uuid: Option> = row.get(i); + uuid.map(ArrayValue::Uuid) + .map(Value::Array) + .unwrap_or(Value::Null) + } &Type::INET | &Type::CIDR => { let s: Option = row.get(i); s.map(Value::IpAddr).unwrap_or(Value::Null) diff --git a/nexus/value/src/array.rs b/nexus/value/src/array.rs index 2fa299bb34..9117dbf568 100644 --- a/nexus/value/src/array.rs +++ b/nexus/value/src/array.rs @@ -4,6 +4,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use chrono::{DateTime, NaiveDate, NaiveTime, Utc}; use pgwire::types::ToSqlText; use postgres_types::{IsNull, ToSql, Type}; +use uuid::{Uuid, fmt::Hyphenated}; #[derive(Debug, PartialEq, Clone)] pub enum ArrayValue { @@ -21,6 +22,7 @@ pub enum ArrayValue { Text(Vec), Binary(Vec), VarBinary(Vec), + Uuid(Vec), Date(Vec), Time(Vec), TimeWithTimeZone(Vec), @@ -94,6 +96,11 @@ impl ArrayValue { .map(|v| serde_json::Value::String(hex::encode(v))) .collect(), ), + ArrayValue::Uuid(arr) => serde_json::Value::Array( + arr.iter() + .map(|v| serde_json::Value::String(v.to_string())) + .collect(), + ), ArrayValue::Date(arr) => serde_json::Value::Array( arr.iter() .map(|&v| serde_json::Value::String(v.to_string())) @@ -151,6 +158,7 @@ impl ToSql for ArrayValue { ArrayValue::Text(arr) => arr.to_sql(ty, out)?, ArrayValue::Binary(_arr) => todo!("support encoding array of binary"), ArrayValue::VarBinary(_arr) => todo!("support encoding array of varbinary"), + ArrayValue::Uuid(arr) => arr.to_sql(ty, out)?, ArrayValue::Date(arr) => arr.to_sql(ty, out)?, ArrayValue::Time(arr) => arr.to_sql(ty, out)?, ArrayValue::TimeWithTimeZone(arr) => arr.to_sql(ty, out)?, @@ -229,6 +237,14 @@ impl ToSqlText for ArrayValue { ArrayValue::Text(arr) => array_to_sql_text!(arr, ty, out), ArrayValue::Binary(_arr) => todo!("implement encoding array of binary"), ArrayValue::VarBinary(_arr) => todo!("implement encoding array of varbinary"), + ArrayValue::Uuid(arr) => { + let mut buf = [0u8; Hyphenated::LENGTH]; + for v in arr { + out.put_slice(b"'"); + out.put_slice(v.hyphenated().encode_lower(&mut buf).as_bytes()); + out.put_slice(b"',"); + } + } ArrayValue::Date(arr) => array_to_sql_text!(arr, ty, out), ArrayValue::Time(arr) => array_to_sql_text!(arr, ty, out), ArrayValue::TimeWithTimeZone(arr) => array_to_sql_text!(arr, ty, out), diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index 2c6f375d0e..4595e45b3f 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -1,4 +1,4 @@ -# syntax=docker/dockerfile:1.11@sha256:10c699f1b6c8bdc8f6b4ce8974855dd8542f1768c26eb240237b8f1c9c6c9976 +# syntax=docker/dockerfile:1.12@sha256:db1ff77fb637a5955317c7a3a62540196396d565f3dd5742e76dddbb6d75c4c5 FROM golang:1.23-alpine@sha256:c694a4d291a13a9f9d94933395673494fc2cc9d4777b85df3a7e70b3492d3574 AS builder RUN apk add --no-cache gcc geos-dev musl-dev diff --git a/stacks/peerdb-server.Dockerfile b/stacks/peerdb-server.Dockerfile index 16f0a58eaa..497b3aa7c9 100644 --- a/stacks/peerdb-server.Dockerfile +++ b/stacks/peerdb-server.Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1@sha256:865e5dd094beca432e8c0a1d5e1c465db5f998dca4e439981029b3b81fb39ed5 -FROM lukemathwalker/cargo-chef:latest-rust-alpine3.20@sha256:a539f69c0a6b9d328b398f1e7aed81d53e986b49db485557cdb3e4479ea42889 as chef +FROM lukemathwalker/cargo-chef:latest-rust-alpine3.20@sha256:5b4cc6b770d17769eec91c97e8b85173b1c15a23d218e0c538e05b25a774aa88 as chef WORKDIR /root FROM chef as planner diff --git a/stacks/peerdb-ui.Dockerfile b/stacks/peerdb-ui.Dockerfile index 42cedca118..f976aaee04 100644 --- a/stacks/peerdb-ui.Dockerfile +++ b/stacks/peerdb-ui.Dockerfile @@ -1,4 +1,4 @@ -# syntax=docker/dockerfile:1.11@sha256:10c699f1b6c8bdc8f6b4ce8974855dd8542f1768c26eb240237b8f1c9c6c9976 +# syntax=docker/dockerfile:1.12@sha256:db1ff77fb637a5955317c7a3a62540196396d565f3dd5742e76dddbb6d75c4c5 # Base stage FROM node:22-alpine@sha256:b64ced2e7cd0a4816699fe308ce6e8a08ccba463c757c00c14cd372e3d2c763e AS base diff --git a/ui/package-lock.json b/ui/package-lock.json index c581877582..eea6bce380 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -2255,9 +2255,9 @@ "license": "MIT" }, "node_modules/@types/node": { - "version": "22.10.0", - "resolved": "https://registry.npmjs.org/@types/node/-/node-22.10.0.tgz", - "integrity": "sha512-XC70cRZVElFHfIUB40FgZOBbgJYFKKMa5nb9lxcwYstFG/Mi+/Y0bGS+rs6Dmhmkpq4pnNiLiuZAbc02YCOnmA==", + "version": "22.10.1", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.10.1.tgz", + "integrity": "sha512-qKgsUwfHZV2WCWLAnVP1JqnpE6Im6h3Y0+fYgMTasNQ7V++CBX5OT1as0g0f+OyubbFqhf6XVNIsmN4IIhEgGQ==", "license": "MIT", "dependencies": { "undici-types": "~6.20.0"