Skip to content

Commit

Permalink
Normalize split (#1088)
Browse files Browse the repository at this point in the history
wanting to do testing as if in customer environment

---------

Co-authored-by: Kaushik Iska <[email protected]>
Co-authored-by: Kevin Biju <[email protected]>
Co-authored-by: Amogh Bharadwaj <[email protected]>
Co-authored-by: Kunal Gupta <[email protected]>
  • Loading branch information
5 people authored Jan 16, 2024
1 parent 36df29d commit 4101934
Show file tree
Hide file tree
Showing 135 changed files with 4,870 additions and 3,529 deletions.
15 changes: 15 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
root = true

[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true

[*.rs]
indent_style = space
indent_size = 4

[{package.json,*.yml,*.yaml}]
indent_style = space
indent_size = 2
5 changes: 2 additions & 3 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ jobs:
run: |
./generate_protos.sh
- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version: ">=1.21.0"
go-version: "1.21"
cache-dependency-path: flow/go.sum

- name: install gotestsum
Expand Down Expand Up @@ -126,4 +126,3 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
43 changes: 19 additions & 24 deletions .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,30 @@ on:
branches: [main]
paths: [flow/**]

permissions:
contents: read

jobs:
golangci-lint:
permissions:
checks: write
contents: read
pull-requests: write
strategy:
matrix:
runner: [ubicloud-standard-4-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
golangci:
name: lint
runs-on: [ubicloud-standard-4-ubuntu-2204-arm]
steps:
- name: checkout
uses: actions/checkout@v4
with:
submodules: recursive

- uses: actions/checkout@v4
- uses: bufbuild/[email protected]

- name: setup protos
run: |
./generate_protos.sh
- name: install lib-geos
run: |
sudo apt-get update
sudo apt-get install libgeos-dev
- uses: actions/setup-go@v5
with:
go-version: "1.21"
cache: false
- name: golangci-lint
uses: reviewdog/action-golangci-lint@v2
uses: golangci/golangci-lint-action@v3
with:
workdir: ./flow
reporter: github-pr-review
github_token: ${{ secrets.GITHUB_TOKEN }}
golangci_lint_flags: "--timeout 10m"
fail_on_error: true
env:
REVIEWDOG_TOKEN: ${{ secrets.REVIEWDOG_TOKEN }}
version: v1.55
working-directory: ./flow
args: --timeout=10m
2 changes: 2 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ services:
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
PEERDB_PASSWORD:
NEXTAUTH_SECRET: __changeme__
NEXTAUTH_URL: http://localhost:3000
depends_on:
- flow-api

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ services:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
NEXTAUTH_SECRET: __changeme__
NEXTAUTH_URL: http://localhost:3000
depends_on:
- flow-api

Expand Down
99 changes: 33 additions & 66 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (a *FlowableActivity) CheckConnection(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)
Expand Down Expand Up @@ -196,12 +197,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
}
activity.RecordHeartbeat(ctx, "initialized table schema")
slog.InfoContext(ctx, "pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
Expand All @@ -222,6 +217,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
defer shutdown()

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
Expand Down Expand Up @@ -276,22 +277,24 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed in pull records when: %w", err)
}
slog.InfoContext(ctx, "no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
return syncResponse, nil
}
tableSchemaDeltas := recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("pushing records for job - %s", jobName)
})
defer shutdown()
err := dstConn.ReplayTableSchemaDeltas(flowName, tableSchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

return &model.SyncResponse{
RelationMessageMapping: <-recordBatch.RelationMessageMapping,
TableSchemaDeltas: tableSchemaDeltas,
}, nil
}

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.FlowConnectionConfigs.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
PushBatchSize: input.FlowConnectionConfigs.PushBatchSize,
PushParallelism: input.FlowConnectionConfigs.PushParallelism,
Expand Down Expand Up @@ -355,11 +358,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
res.RelationMessageMapping = <-recordBatch.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

return res, nil
}
Expand All @@ -378,35 +380,26 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(dstConn)

lastSyncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get last sync batch ID: %v", err)
}

err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
input.SyncBatchID)
return nil, err
} else if err != nil {
return nil, err
}
defer connectors.CloseConnector(dstConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, 15*time.Second, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer shutdown()

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
}

res, err := dstConn.NormalizeRecords(&model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncBatchID: input.SyncBatchID,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
Expand All @@ -427,35 +420,12 @@ func (a *FlowableActivity) StartNormalize(
}

// log the number of batches normalized
if res != nil {
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n",
res.StartBatchID, res.EndBatchID))
}
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n",
res.StartBatchID, res.EndBatchID))

return res, nil
}

func (a *FlowableActivity) ReplayTableSchemaDeltas(
ctx context.Context,
input *protos.ReplayTableSchemaDeltaInput,
) error {
dest, err := connectors.GetCDCNormalizeConnector(ctx, input.FlowConnectionConfigs.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return nil
} else if err != nil {
return err
}
defer connectors.CloseConnector(dest)

err = dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return fmt.Errorf("failed to replay table schema deltas: %w", err)
}

return nil
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error {
conn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
Expand Down Expand Up @@ -624,7 +594,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
}

shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, 1*time.Minute, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()
Expand All @@ -637,6 +607,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,

if rowsSynced == 0 {
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId))
pullCancel()
} else {
wg.Wait()
if goroutineErr != nil {
Expand All @@ -653,11 +624,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
if err != nil {
return err
}

return nil
return err
}

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
Expand Down
5 changes: 3 additions & 2 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

Expand All @@ -33,7 +34,7 @@ func (a *FlowableActivity) handleSlotInfo(
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold()
slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
Expand All @@ -42,7 +43,7 @@ cc: <!channel>`,
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold()
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
Expand Down
18 changes: 14 additions & 4 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(flowJobName string) error {
}

if s, ok := a.SnapshotConnections[flowJobName]; ok {
s.signal.CloneComplete <- struct{}{}
close(s.signal.CloneComplete)
s.connector.Close()
}

Expand Down Expand Up @@ -52,12 +52,22 @@ func (a *SnapshotActivity) SetupReplication(
replicationErr := make(chan error)
defer close(replicationErr)

closeConnectionForError := func(err error) {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
// it is important to close the connection here as it is not closed in CloseSlotKeepAlive
connCloseErr := conn.Close()
if connCloseErr != nil {
slog.ErrorContext(ctx, "failed to close connection", slog.Any("error", connCloseErr))
}
}

// This now happens in a goroutine
go func() {
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(slotSignal, config)
if err != nil {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
closeConnectionForError(err)
replicationErr <- err
return
}
Expand All @@ -69,12 +79,12 @@ func (a *SnapshotActivity) SetupReplication(
case slotInfo = <-slotSignal.SlotCreated:
slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName))
case err := <-replicationErr:
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
closeConnectionForError(err)
return nil, fmt.Errorf("failed to setup replication: %w", err)
}

if slotInfo.Err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, slotInfo.Err)
closeConnectionForError(slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
}

Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (h *FlowRequestHandler) FlowStateChange(
}
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
*currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum())
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
}
Expand All @@ -468,7 +468,7 @@ func (h *FlowRequestHandler) FlowStateChange(
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum())
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4101934

Please sign in to comment.