From 5616b6869369f9ece624abd80821a7b4fea41cc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 24 Dec 2024 15:50:53 +0000 Subject: [PATCH] testing --- flow/connectors/clickhouse/clickhouse.go | 2 +- flow/e2e/clickhouse/peer_flow_ch_test.go | 30 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index ff77a74ba5..e5dff81063 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -233,7 +233,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency "select_sequential_consistency": uint64(1), // broken downstream views should not interrupt ingestion - "ignore_materialized_views_with_dropped_target_table": true, + // "ignore_materialized_views_with_dropped_target_table": true, } if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 813d3d35c5..7504613bad 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -776,3 +776,33 @@ func (s ClickHouseSuite) Test_Types_CH() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +func (s ClickHouseSuite) Test_IgnoreViewWithMissingTable() { + srcTableName := s.attachSchemaSuffix("test_ignore_view") + dstTableName := "test_ignore_view" + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY)", srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("ch_ignore_view"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + + ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + + require.NoError(s.t, ch.Exec( + context.Background(), + fmt.Sprintf("create view v as select * from %[1]s union select * from b", dstTableName), + )) + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s (id) VALUES (1)", srcTableName)) +}