Skip to content

Commit

Permalink
Merge branch 'main' into bq/fix-resync
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 16, 2024
2 parents f5da8ea + 4be0d0a commit 8645157
Show file tree
Hide file tree
Showing 60 changed files with 2,339 additions and 1,445 deletions.
2 changes: 1 addition & 1 deletion .github/actions/genprotos/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ runs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
- name: check cache
id: cache
uses: ubicloud/cache@0a97811d53629b143a56b3c2b1f729fd11719ef7 # v4
uses: ubicloud/cache@92361f338d82d2c58a98875f1b5c95cd14cd6b2a # v4
with:
path: |
./flow/generated/protos
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}
dir: "nexus/server/tests/assets/"

- uses: ubicloud/rust-cache@69587b2b3f26e8938580c44a643d265ed12f3119 # v2
- uses: ubicloud/rust-cache@65b3ff06b9bcc69d88c25e212f1ae3d14a0953c3 # v2
with:
workspaces: nexus

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cleanup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: checkout sources
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4

- uses: ubicloud/setup-go@35680fe0723d4a9309d4b1ac1c67e0d46eac5f24 # v5
- uses: ubicloud/setup-go@6ea6c4fb36acc657571a53f4a7471e75f5fa80ad # v5
with:
go-version: '1.23.0'
cache-dependency-path: e2e_cleanup/go.sum
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@f09c1c0a94de965c15400f5634aa42fac8fb8f88 # v3
uses: github/codeql-action/init@df409f7d9260372bd5f19e5b04e83cb3c43714ae # v3
with:
languages: ${{ matrix.language }}
build-mode: ${{ matrix.build-mode }}

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@f09c1c0a94de965c15400f5634aa42fac8fb8f88 # v3
uses: github/codeql-action/analyze@df409f7d9260372bd5f19e5b04e83cb3c43714ae # v3
with:
category: "/language:${{matrix.language}}"
16 changes: 8 additions & 8 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ jobs:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
redpanda:
image: redpandadata/redpanda@sha256:f2f8bb89f1a0747cc6f86440cb3a0916e981e136e1d72392bab179f73492fb0f
ports:
- 9092:9092
- 9644:9644
elasticsearch:
image: elasticsearch:8.16.1@sha256:e5ee5f8dacbf18fa3ab59a098cc7d4d69f73e61637eb45f1c029e74b1cb200a1
ports:
Expand All @@ -33,7 +38,7 @@ jobs:
xpack.security.enabled: false
xpack.security.enrollment.enabled: false
minio:
image: bitnami/minio:2024.11.7@sha256:9f2d9c45006a2ada1bc485e1393291ce7d54ae1a46260dd491381a4eb8b2fd47
image: bitnami/minio:2024.12.13@sha256:2a258ab6876f6ed3cd5609836d065f20927955a2ae721fd9edde8ca388b52135
ports:
- 9999:9999
env:
Expand All @@ -49,7 +54,7 @@ jobs:
- name: generate or hydrate protos
uses: ./.github/actions/genprotos

- uses: ubicloud/setup-go@35680fe0723d4a9309d4b1ac1c67e0d46eac5f24 # v5
- uses: ubicloud/setup-go@6ea6c4fb36acc657571a53f4a7471e75f5fa80ad # v5
with:
go-version: '1.23.0'
cache-dependency-path: flow/go.sum
Expand Down Expand Up @@ -109,12 +114,7 @@ jobs:
env:
PGPASSWORD: postgres

- name: start redpanda
uses: redpanda-data/github-action@c68af8edc420b987e871615ca40b3a5dd70eb5b1 # v0.1.4
with:
version: "latest"

- uses: ubicloud/cache@0a97811d53629b143a56b3c2b1f729fd11719ef7 # v4
- uses: ubicloud/cache@92361f338d82d2c58a98875f1b5c95cd14cd6b2a # v4
id: cache-clickhouse
with:
path: ./clickhouse
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
run: |
sudo apt-get update
sudo apt-get install libgeos-dev
- uses: ubicloud/setup-go@35680fe0723d4a9309d4b1ac1c67e0d46eac5f24 # v5
- uses: ubicloud/setup-go@6ea6c4fb36acc657571a53f4a7471e75f5fa80ad # v5
with:
go-version: '1.23.0'
cache: false
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/update-docker-compose-stable.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
update-docker-compose-tag:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
with:
ref: main
- name: create-PR
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: postgres:17-alpine@sha256:e7897baa70dae1968d23d785adb4aeb699175e0bcaae44f98a7083ecb9668b93
image: postgres:17-alpine@sha256:d37d2c160d34430877c802e5adc22824a2ad453499db9bab1a2ceb2be6c1a46f
command: -c config_file=/etc/postgresql.conf
ports:
- 9901:5432
Expand Down
12 changes: 6 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: postgres:17-alpine@sha256:e7897baa70dae1968d23d785adb4aeb699175e0bcaae44f98a7083ecb9668b93
image: postgres:17-alpine@sha256:d37d2c160d34430877c802e5adc22824a2ad453499db9bab1a2ceb2be6c1a46f
command: -c config_file=/etc/postgresql.conf
restart: unless-stopped
ports:
Expand Down Expand Up @@ -112,7 +112,7 @@ services:

flow-api:
container_name: flow_api
image: ghcr.io/peerdb-io/flow-api:stable-v0.20.0
image: ghcr.io/peerdb-io/flow-api:stable-v0.20.2
restart: unless-stopped
ports:
- 8112:8112
Expand All @@ -128,7 +128,7 @@ services:

flow-snapshot-worker:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.20.0
image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.20.2
restart: unless-stopped
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
Expand All @@ -138,7 +138,7 @@ services:

flow-worker:
container_name: flow-worker
image: ghcr.io/peerdb-io/flow-worker:stable-v0.20.0
image: ghcr.io/peerdb-io/flow-worker:stable-v0.20.2
restart: unless-stopped
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
Expand All @@ -151,7 +151,7 @@ services:
peerdb:
container_name: peerdb-server
stop_signal: SIGINT
image: ghcr.io/peerdb-io/peerdb-server:stable-v0.20.0
image: ghcr.io/peerdb-io/peerdb-server:stable-v0.20.2
restart: unless-stopped
environment:
<<: *catalog-config
Expand All @@ -167,7 +167,7 @@ services:

peerdb-ui:
container_name: peerdb-ui
image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.20.0
image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.20.2
restart: unless-stopped
ports:
- 3000:3000
Expand Down
5 changes: 2 additions & 3 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -120,7 +119,7 @@ func (h *FlowRequestHandler) ListPeers(
req *protos.ListPeersRequest,
) (*protos.ListPeersResponse, error) {
query := "SELECT name, type FROM peers"
if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
if peerdbenv.PeerDBOnlyClickHouseAllowed() {
// only postgres and clickhouse peers
query += " WHERE type IN (3, 8)"
}
Expand Down Expand Up @@ -148,7 +147,7 @@ func (h *FlowRequestHandler) ListPeers(
}

destinationItems := peers
if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
if peerdbenv.PeerDBOnlyClickHouseAllowed() {
destinationItems = make([]*protos.PeerListItem, 0, len(peers))
for _, peer := range peers {
// only clickhouse peers
Expand Down
3 changes: 1 addition & 2 deletions flow/cmd/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"log/slog"
"slices"
"strings"

"github.com/jackc/pgx/v5"

Expand Down Expand Up @@ -37,7 +36,7 @@ func (h *FlowRequestHandler) GetDynamicSettings(
return nil, err
}

if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
if peerdbenv.PeerDBOnlyClickHouseAllowed() {
filteredSettings := make([]*protos.DynamicSetting, 0)
for _, setting := range settings {
if setting.TargetForSetting == protos.DynconfTarget_ALL ||
Expand Down
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
batchId int64,
tableToSchema map[string]*protos.TableSchema,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

Expand Down Expand Up @@ -283,7 +284,11 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
}
if len(row) > 0 {
value := row[0].(string)
distinctTableNames = append(distinctTableNames, value)
if _, ok := tableToSchema[value]; ok {
distinctTableNames = append(distinctTableNames, value)
} else {
c.logger.Warn("table not found in table to schema mapping", "table", value)
}
}
}

Expand Down Expand Up @@ -446,6 +451,7 @@ func (c *BigQueryConnector) mergeTablesInThisBatch(
ctx,
flowName,
batchId,
tableToSchema,
)
if err != nil {
return fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString,
qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ,
qvalue.QValueKindArrayDate:
qvalue.QValueKindArrayDate, qvalue.QValueKindArrayUUID:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element WHERE element IS NOT null) AS `%s`",
bqTypeString, column.Name, shortCol)
Expand Down
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab
// string related
case qvalue.QValueKindString:
bqField.Type = bigquery.StringFieldType
// json also is stored as string for now
// json related
case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore:
bqField.Type = bigquery.JSONFieldType
// time related
Expand Down Expand Up @@ -69,6 +69,12 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab
bqField.Repeated = true
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
bqField.Type = bigquery.GeographyFieldType
// UUID related - stored as strings for now
case qvalue.QValueKindUUID:
bqField.Type = bigquery.StringFieldType
case qvalue.QValueKindArrayUUID:
bqField.Type = bigquery.StringFieldType
bqField.Repeated = true
// rest will be strings
default:
bqField.Type = bigquery.StringFieldType
Expand Down
Loading

0 comments on commit 8645157

Please sign in to comment.