Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic properties test #1216

Merged
merged 10 commits into from
Feb 12, 2024
378 changes: 201 additions & 177 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

Expand Down Expand Up @@ -1103,180 +1106,201 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() {

// test don't work, make it work later

// func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
// env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
// // needed otherwise errors out
// workerOptions := worker.Options{
// EnableSessionWorker: true,
// }
// env.SetWorkerOptions(workerOptions)

// srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1")
// srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2")
// dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst")
// dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst")
// sentPause := false
// sentUpdate := false

// // signals in tests are weird, you need to register them before starting the workflow
// // otherwise you guessed it, errors out. really don't like this.
// // too short of a gap between signals also causes issues
// // might have something to do with how test workflows handle fast-forwarding time.
// env.RegisterDelayedCallback(func() {
// env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal)
// s.t.Log("Sent pause signal")
// sentPause = true
// }, 28*time.Second)
// env.RegisterDelayedCallback(func() {
// env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{
// IdleTimeout: 14,
// BatchSize: 12,
// AdditionalTables: []*protos.TableMapping{
// {
// SourceTableIdentifier: srcTable2Name,
// DestinationTableIdentifier: dstTable2Name,
// },
// },
// })
// s.t.Log("Sent update signal")
// sentUpdate = true
// }, 56*time.Second)
// env.RegisterDelayedCallback(func() {
// env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal)
// s.t.Log("Sent resume signal")
// }, 84*time.Second)

// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
// CREATE TABLE IF NOT EXISTS %s (
// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
// t TEXT DEFAULT md5(random()::text));
// CREATE TABLE IF NOT EXISTS %s (
// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
// t TEXT DEFAULT md5(random()::text));
// `, srcTable1Name, srcTable2Name))
// require.NoError(s.t, err)

// connectionGen := e2e.FlowConnectionGenerationConfig{
// FlowJobName: s.attachSuffix("test_dynconfig"),
// }

// config := &protos.FlowConnectionConfigs{
// FlowJobName: connectionGen.FlowJobName,
// Destination: s.peer,
// TableMappings: []*protos.TableMapping{
// {
// SourceTableIdentifier: srcTable1Name,
// DestinationTableIdentifier: dstTable1Name,
// },
// },
// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
// CdcStagingPath: connectionGen.CdcStagingPath,
// MaxBatchSize: 6,
// IdleTimeoutSeconds: 7,
// DoInitialSnapshot: true,
// SnapshotNumRowsPerPartition: 1000,
// SnapshotMaxParallelWorkers: 1,
// SnapshotNumTablesInParallel: 1,
// }

// addRows := func(numRows int) {
// for i := 0; i < numRows; i++ {
// _, err = s.conn.Exec(context.Background(),
// fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name))
// e2e.EnvNoError(s.t, env, err)
// _, err = s.conn.Exec(context.Background(),
// fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name))
// e2e.EnvNoError(s.t, env, err)
// }
// s.t.Logf("Inserted %d rows into the source table", numRows)
// }

// getWorkFlowState := func() peerflow.CDCFlowWorkflowState {
// var workflowState peerflow.CDCFlowWorkflowState
// val, err := env.QueryWorkflow(shared.CDCFlowStateQuery)
// e2e.EnvNoError(s.t, env, err)
// err = val.Get(&workflowState)
// e2e.EnvNoError(s.t, env, err)

// return workflowState
// }

// getFlowStatus := func() protos.FlowStatus {
// var flowStatus protos.FlowStatus
// val, err := env.QueryWorkflow(shared.FlowStatusQuery)
// e2e.EnvNoError(s.t, env, err)
// err = val.Get(&flowStatus)
// e2e.EnvNoError(s.t, env, err)

// return flowStatus
// }

// // add before to test initial load too.
// addRows(18)
// go func() {
// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// // insert 18 rows into the source tables, exactly 3 batches
// addRows(18)

// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
// })

// workflowState := getWorkFlowState()
// require.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
// require.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
// require.EqualValues(s.t, 1, len(workflowState.TableMappings))
// require.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping))
// require.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping))
// // we have limited batch size to 6, so atleast 3 syncs needed
// require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

// // wait for first RegisterDelayedCallback to hit.
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
// return sentPause
// })
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// // keep adding 1 more row - guarantee finishing another sync
// addRows(1)
// flowStatus := getFlowStatus()
// return flowStatus == protos.FlowStatus_STATUS_PAUSED
// })
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
// })

// // we have a paused mirror, wait for second signal to hit.
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
// return sentUpdate
// })

// // add rows to both tables before resuming - should handle
// addRows(18)

// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
// flowStatus := getFlowStatus()
// return flowStatus == protos.FlowStatus_STATUS_RUNNING
// })
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
// })
// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
// return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
// })

// workflowState = getWorkFlowState()
// require.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
// require.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
// require.EqualValues(s.t, 2, len(workflowState.TableMappings))
// require.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping))
// require.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping))
// // 3 from first insert of 18 rows in 1 table
// // 1 from pre-pause
// // 3 from second insert of 18 rows in 2 tables, batch size updated
// require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)

// env.CancelWorkflow()
// }()

// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
// }
func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
// needed otherwise errors out
workerOptions := worker.Options{
EnableSessionWorker: true,
}
env.SetWorkerOptions(workerOptions)

srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1")
srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2")
dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst")
dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst")
sentPause := false
isPaused := false
sentUpdate := false

_, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
t TEXT DEFAULT md5(random()::text));
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
t TEXT DEFAULT md5(random()::text));
`, srcTable1Name, srcTable2Name))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_dynconfig"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTable1Name,
DestinationTableIdentifier: dstTable1Name,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
MaxBatchSize: 6,
IdleTimeoutSeconds: 7,
DoInitialSnapshot: true,
SnapshotNumRowsPerPartition: 1000,
SnapshotMaxParallelWorkers: 1,
SnapshotNumTablesInParallel: 1,
}

addRows := func(numRows int) {
for i := 0; i < numRows; i++ {
_, err = s.conn.Exec(context.Background(),
fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name))
e2e.EnvNoError(s.t, env, err)
_, err = s.conn.Exec(context.Background(),
fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name))
e2e.EnvNoError(s.t, env, err)
}
s.t.Logf("Inserted %d rows into the source table", numRows)
}

getWorkFlowState := func() peerflow.CDCFlowWorkflowState {
var workflowState peerflow.CDCFlowWorkflowState
val, err := env.QueryWorkflow(shared.CDCFlowStateQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&workflowState)
e2e.EnvNoError(s.t, env, err)

return workflowState
}

getFlowStatus := func() protos.FlowStatus {
var flowStatus protos.FlowStatus
val, err := env.QueryWorkflow(shared.FlowStatusQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&flowStatus)
e2e.EnvNoError(s.t, env, err)

return flowStatus
}

// signals in tests are weird, you need to register them before starting the workflow
// otherwise you guessed it, errors out. really don't like this.
// too short of a gap between signals also causes issues
// might have something to do with how test workflows handle fast-forwarding time.
env.RegisterDelayedCallback(func() {
env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal)
s.t.Log("Sent pause signal")
sentPause = true
}, 28*time.Second)
// this signal being sent also unblocks another WaitFor
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send update signal after pause confirmed", func() bool {
flowStatus := getFlowStatus()
if flowStatus != protos.FlowStatus_STATUS_PAUSED {
return false
}
isPaused = true
env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{
IdleTimeout: 14,
BatchSize: 12,
AdditionalTables: []*protos.TableMapping{
{
SourceTableIdentifier: srcTable2Name,
DestinationTableIdentifier: dstTable2Name,
},
},
})
s.t.Log("Sent update signal")
sentUpdate = true
return true
})
}, 56*time.Second)
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool {
if !sentUpdate {
return false
}
env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal)
s.t.Log("Sent resume signal")
return true
})
}, 84*time.Second)

// add before to test initial load too.
addRows(18)
go func() {
defer env.CancelWorkflow()
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 18 rows into the source tables, exactly 3 batches
addRows(18)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

workflowState := getWorkFlowState()
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 1)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 1)
// we have limited batch size to 6, so atleast 3 syncs needed
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
if !s.t.Failed() {
// wait for first RegisterDelayedCallback to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
return sentPause
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// keep adding 1 more row - guarantee finishing another sync
addRows(1)
// isPaused - set from the WaitFor that sends update signal
return isPaused
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

// we have a paused mirror, wait for second signal to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
return sentUpdate
})
}

// add rows to both tables before resuming - should handle
addRows(18)

if !s.t.Failed() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
})
}

if !s.t.Failed() {
workflowState = getWorkFlowState()
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 2)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 2)
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
}
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
e2e.RequireEnvCanceled(s.t, env)
}
Loading