diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index ff77a74ba5..e5dff81063 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -233,7 +233,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency "select_sequential_consistency": uint64(1), // broken downstream views should not interrupt ingestion - "ignore_materialized_views_with_dropped_target_table": true, + // "ignore_materialized_views_with_dropped_target_table": true, } if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 813d3d35c5..8070b78d44 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -776,3 +776,36 @@ func (s ClickHouseSuite) Test_Types_CH() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +func (s ClickHouseSuite) Test_IgnoreViewWithMissingTable() { + srcTableName := s.attachSchemaSuffix("test_ignore_view") + dstTableName := "test_ignore_view" + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY)", srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("ch_ignore_view"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + + ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + require.NoError(s.t, ch.Exec(context.Background(), "create table test_ignore_view_drop(id int) order by id")) + require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf( + "create view v as select * from %s union all select * from test_ignore_view_drop", dstTableName, + ))) + require.NoError(s.t, ch.Exec(context.Background(), "drop table test_ignore_view_drop")) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s (id) VALUES (1)", srcTableName)) + require.NoError(s.t, err) + e2e.EnvWaitForEqualTablesWithNames(env, s, "row synced", srcTableName, dstTableName, "id") +}