Skip to content

Commit

Permalink
testing
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 24, 2024
1 parent 06bd9ae commit 111d609
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou
// See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency
"select_sequential_consistency": uint64(1),
// broken downstream views should not interrupt ingestion
"ignore_materialized_views_with_dropped_target_table": true,
// "ignore_materialized_views_with_dropped_target_table": true,
}
if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil {
return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err)
Expand Down
33 changes: 33 additions & 0 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,3 +776,36 @@ func (s ClickHouseSuite) Test_Types_CH() {
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_IgnoreViewWithMissingTable() {
srcTableName := s.attachSchemaSuffix("test_ignore_view")
dstTableName := "test_ignore_view"

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY)", srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("ch_ignore_view"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)

ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig())
require.NoError(s.t, err)
defer ch.Close()

tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

require.NoError(s.t, ch.Exec(context.Background(), "create table test_ignore_view_drop(id int) order by id"))
require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf(
"create view v as select * from %s union all select * from test_ignore_view_drop", dstTableName,
)))
require.NoError(s.t, ch.Exec(context.Background(), "drop table test_ignore_view_drop"))

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s (id) VALUES (1)", srcTableName))
require.NoError(s.t, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "row synced", srcTableName, dstTableName, "id")
}

0 comments on commit 111d609

Please sign in to comment.