Skip to content

Commit

Permalink
Merge branch 'main' into ui/improvements-6
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Dec 25, 2023
2 parents 4536229 + d407f9e commit ab30aa1
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 57 deletions.
29 changes: 22 additions & 7 deletions dev-peerdb.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
#!/bin/bash
set -Eeuo pipefail
#!/bin/sh
if test -z "$USE_PODMAN"
then
if ! command -v docker &> /dev/null
then
if command -v podman-compose
then
echo "docker could not be found on PATH, using podman-compose"
USE_PODMAN=1
else
echo "docker could not be found on PATH"
exit 1
fi
fi
fi

if ! command -v docker &> /dev/null
if test -z "$USE_PODMAN"
then
echo "docker could not be found on PATH"
exit 1
DOCKER="docker compose"
EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui"
else
DOCKER="podman-compose --podman-run-args=--replace"
EXTRA_ARGS=""
fi

export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD)
docker compose -f docker-compose-dev.yml up --build \
--no-attach temporal --no-attach pyroscope --no-attach temporal-ui
exec $DOCKER -f docker-compose-dev.yml up --build $EXTRA_ARGS
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready", "-d", "postgres", "-U", "postgres"]
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
timeout: 30s
retries: 5
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready", "-d", "postgres", "-U", "postgres"]
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
timeout: 30s
retries: 5
Expand Down
5 changes: 5 additions & 0 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func (a *FlowableActivity) handleSlotInfo(
return err
}

if slotInfo == nil || len(slotInfo) == 0 {
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
return nil
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
Expand Down
16 changes: 6 additions & 10 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

func (c *BigQueryConnector) WaitForTableReady(tblName string) error {
func (c *BigQueryConnector) waitForTableReady(tblName string) error {
table := c.client.Dataset(c.datasetID).Table(tblName)
maxDuration := 5 * time.Minute
deadline := time.Now().Add(maxDuration)
sleepInterval := 15 * time.Second
sleepInterval := 5 * time.Second
attempt := 0

for {
Expand Down Expand Up @@ -816,20 +816,16 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
stmts = append(stmts, mergeStmts...)
mergeStmt := mergeGen.generateMergeStmt()
stmts = append(stmts, mergeStmt)
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';",
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")

// put this within a transaction
// TODO - not truncating rows in staging table as of now.
// err = c.truncateTable(staging...)

_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err)
Expand Down Expand Up @@ -931,7 +927,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec
c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID)
if hasJob {
jobStatement = fmt.Sprintf(
"UPDATE %s.%s SET offset = %d,sync_batch_id=%d WHERE mirror_job_name = '%s';",
"UPDATE %s.%s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';",
c.datasetID, MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName)
}

Expand Down
41 changes: 11 additions & 30 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type mergeStmtGenerator struct {
Expand All @@ -30,24 +29,6 @@ type mergeStmtGenerator struct {
peerdbCols *protos.PeerDBColumns
}

// GenerateMergeStmt generates a merge statements.
func (m *mergeStmtGenerator) generateMergeStmts() []string {
// return an empty array for now
flattenedCTE := m.generateFlattenedCTE()
deDupedCTE := m.generateDeDupedCTE()
tempTable := fmt.Sprintf("_peerdb_de_duplicated_data_%s", shared.RandomString(5))
// create temp table stmt
createTempTableStmt := fmt.Sprintf(
"CREATE TEMP TABLE %s AS (%s, %s);",
tempTable, flattenedCTE, deDupedCTE)

mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols)

dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable)

return []string{createTempTableStmt, mergeStmt, dropTempTableStmt}
}

// generateFlattenedCTE generates a flattened CTE.
func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR
Expand Down Expand Up @@ -129,7 +110,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string {
func (m *mergeStmtGenerator) generateMergeStmt() string {
// comma separated list of column names
backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
Expand All @@ -138,13 +119,13 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro
pureColNames = append(pureColNames, colName)
}
csep := strings.Join(backtickColNames, ", ")
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName)
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.UnchangedToastColumns, peerdbCols)
m.UnchangedToastColumns, m.peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName)
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE"

updateStatementsforToastCols = append(updateStatementsforToastCols,
Expand All @@ -162,25 +143,25 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if peerdbCols.SoftDelete {
colName := peerdbCols.SoftDeleteColName
if m.peerdbCols.SoftDelete {
colName := m.peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
if peerdbCols.SyncedAtColName != "" {
if m.peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP",
deletePart, peerdbCols.SyncedAtColName)
deletePart, m.peerdbCols.SyncedAtColName)
}
}

return fmt.Sprintf(`
MERGE %s.%s _peerdb_target USING %s _peerdb_deduped
MERGE %s.%s _peerdb_target USING (%s,%s) _peerdb_deduped
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
%s;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL,
updateStringToastCols, deletePart)
`, m.Dataset, m.NormalizedTable, m.generateFlattenedCTE(), m.generateDeDupedCTE(),
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
}

/*
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
slog.String("batchOrPartitionID", syncID),
)
if s.gcsBucket != "" {

bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath := fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
Expand Down Expand Up @@ -415,7 +414,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
}
slog.Info(fmt.Sprintf("Pushed into %s/%s", avroFile.FilePath, syncID))

err = s.connector.WaitForTableReady(stagingTable)
err = s.connector.waitForTableReady(stagingTable)
if err != nil {
return 0, fmt.Errorf("failed to wait for table to be ready: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ const (
rawTableMultiValueInsertSQL = "INSERT INTO %s.%s VALUES%s"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"
toVariantColumnName = "VAR_COLS"
mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (SELECT _PEERDB_UID,
_PEERDB_TIMESTAMP,
TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,
_PEERDB_UNCHANGED_TOAST_COLUMNS FROM
_PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND
mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (
SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,
_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,_PEERDB_UNCHANGED_TOAST_COLUMNS
FROM _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND
_PEERDB_DESTINATION_TABLE_NAME = ? ), FLATTENED AS
(SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,
_PEERDB_UNCHANGED_TOAST_COLUMNS,%s
Expand All @@ -66,7 +65,8 @@ const (

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)"

updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET OFFSET=?, SYNC_BATCH_ID=? WHERE MIRROR_JOB_NAME=?"
updateMetadataForSyncRecordsSQL = `UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?), SYNC_BATCH_ID=?
WHERE MIRROR_JOB_NAME=?`
updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?"

checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES
Expand Down

0 comments on commit ab30aa1

Please sign in to comment.