From 7c4e1b23004af279ccd1ca1f5f457b31633f709d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 13 Nov 2024 18:48:15 +0000 Subject: [PATCH] e2e: resync weird names (#2237) fixes #2236 --- flow/cmd/handler.go | 28 ++++------ flow/connectors/postgres/client.go | 5 +- flow/connectors/postgres/postgres.go | 44 +++++++--------- flow/connectors/postgres/validate.go | 6 +-- flow/e2e/clickhouse/peer_flow_ch_test.go | 66 +++++++++++++++++++----- flow/e2e/congen.go | 22 +++----- flow/e2e/test_utils.go | 6 +-- 7 files changed, 96 insertions(+), 81 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 8b30331ae8..e2d1da2e39 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -408,7 +408,7 @@ func (h *FlowRequestHandler) handleCancelWorkflow(ctx context.Context, workflowI if err != nil { slog.Error(fmt.Sprintf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error())) terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID) - if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil { + if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil { return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err) } } @@ -456,10 +456,9 @@ func (h *FlowRequestHandler) DropPeer( } var inMirror pgtype.Int8 - queryErr := h.pool.QueryRow(ctx, - "SELECT COUNT(*) FROM flows WHERE source_peer=$1 or destination_peer=$2", - peerID, peerID).Scan(&inMirror) - if queryErr != nil { + if queryErr := h.pool.QueryRow(ctx, + "SELECT COUNT(*) FROM flows WHERE source_peer=$1 or destination_peer=$1", peerID, + ).Scan(&inMirror); queryErr != nil { return nil, fmt.Errorf("failed to check for existing mirrors with peer %s: %w", req.PeerName, queryErr) } @@ -467,8 +466,7 @@ func (h *FlowRequestHandler) DropPeer( return nil, fmt.Errorf("peer %s is currently involved in an ongoing mirror", req.PeerName) } - _, delErr := h.pool.Exec(ctx, "DELETE FROM peers WHERE name = $1", req.PeerName) - if delErr != nil { + if _, delErr := h.pool.Exec(ctx, "DELETE FROM peers WHERE name = $1", req.PeerName); delErr != nil { return nil, fmt.Errorf("failed to delete peer %s from metadata table: %w", req.PeerName, delErr) } @@ -477,9 +475,8 @@ func (h *FlowRequestHandler) DropPeer( func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) { q := "SELECT workflow_id FROM flows WHERE name = $1" - row := h.pool.QueryRow(ctx, q, flowJobName) var workflowID string - if err := row.Scan(&workflowID); err != nil { + if err := h.pool.QueryRow(ctx, q, flowJobName).Scan(&workflowID); err != nil { return "", fmt.Errorf("unable to get workflowID for flow job %s: %w", flowJobName, err) } @@ -507,22 +504,19 @@ func (h *FlowRequestHandler) ResyncMirror( config.Resync = true config.DoInitialSnapshot = true // validate mirror first because once the mirror is dropped, there's no going back - _, err = h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{ + if _, err := h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{ ConnectionConfigs: config, - }) - if err != nil { + }); err != nil { return nil, err } - err = h.shutdownFlow(ctx, req.FlowJobName, req.DropStats) - if err != nil { + if err := h.shutdownFlow(ctx, req.FlowJobName, req.DropStats); err != nil { return nil, err } - _, err = h.CreateCDCFlow(ctx, &protos.CreateCDCFlowRequest{ + if _, err := h.CreateCDCFlow(ctx, &protos.CreateCDCFlowRequest{ ConnectionConfigs: config, - }) - if err != nil { + }); err != nil { return nil, err } return &protos.ResyncMirrorResponse{}, nil diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 2d480f780f..1daabbf684 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -376,8 +376,7 @@ func (c *PostgresConnector) createSlotAndPublication( } srcTableNames = append(srcTableNames, parsedSrcTableName.String()) } - err := c.CreatePublication(ctx, srcTableNames, publication) - if err != nil { + if err := c.CreatePublication(ctx, srcTableNames, publication); err != nil { signal.SlotCreated <- SlotCreationResult{Err: err} return } @@ -395,7 +394,7 @@ func (c *PostgresConnector) createSlotAndPublication( c.logger.Warn(fmt.Sprintf("Creating replication slot '%s'", slot)) // THIS IS NOT IN A TX! - if _, err = conn.Exec(ctx, "SET idle_in_transaction_session_timeout=0"); err != nil { + if _, err := conn.Exec(ctx, "SET idle_in_transaction_session_timeout=0"); err != nil { signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)} return } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index d0087d3beb..b3161161e1 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -1096,7 +1096,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig return } - tableNameMapping := make(map[string]model.NameAndExclude) + tableNameMapping := make(map[string]model.NameAndExclude, len(req.TableNameMapping)) for k, v := range req.TableNameMapping { tableNameMapping[k] = model.NameAndExclude{ Name: v, @@ -1110,9 +1110,9 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error { // Slotname would be the job name prefixed with "peerflow_slot_" slotName := "peerflow_slot_" + jobName - _, err := c.conn.Exec(ctx, `SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots - WHERE slot_name=$1`, slotName) - if err != nil { + if _, err := c.conn.Exec( + ctx, `SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name=$1`, slotName, + ); err != nil { return fmt.Errorf("error dropping replication slot: %w", err) } @@ -1122,14 +1122,14 @@ func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) // as drop publication if exists requires permissions // for a publication which we did not create via peerdb user var publicationExists bool - err = c.conn.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname=$1)", publicationName).Scan(&publicationExists) - if err != nil { + if err := c.conn.QueryRow( + ctx, "SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname=$1)", publicationName, + ).Scan(&publicationExists); err != nil { return fmt.Errorf("error checking if publication exists: %w", err) } if publicationExists { - _, err = c.conn.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName) - if err != nil { + if _, err := c.conn.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName); err != nil { return fmt.Errorf("error dropping publication: %w", err) } } @@ -1144,9 +1144,9 @@ func (c *PostgresConnector) SyncFlowCleanup(ctx context.Context, jobName string) } defer shared.RollbackTx(syncFlowCleanupTx, c.logger) - _, err = c.execWithLoggingTx(ctx, fmt.Sprintf(dropTableIfExistsSQL, c.metadataSchema, - getRawTableIdentifier(jobName)), syncFlowCleanupTx) - if err != nil { + if _, err := c.execWithLoggingTx(ctx, + fmt.Sprintf(dropTableIfExistsSQL, c.metadataSchema, getRawTableIdentifier(jobName)), syncFlowCleanupTx, + ); err != nil { return fmt.Errorf("unable to drop raw table: %w", err) } @@ -1162,8 +1162,7 @@ func (c *PostgresConnector) SyncFlowCleanup(ctx context.Context, jobName string) } } - err = syncFlowCleanupTx.Commit(ctx) - if err != nil { + if err := syncFlowCleanupTx.Commit(ctx); err != nil { return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err) } return nil @@ -1222,9 +1221,9 @@ func (c *PostgresConnector) HandleSlotInfo( attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) var intervalSinceLastNormalize *time.Duration - err = alerter.CatalogPool.QueryRow(ctx, "SELECT now()-max(end_time) FROM peerdb_stats.cdc_batches WHERE flow_name=$1", - alertKeys.FlowName).Scan(&intervalSinceLastNormalize) - if err != nil { + if err := alerter.CatalogPool.QueryRow( + ctx, "SELECT now()-max(end_time) FROM peerdb_stats.cdc_batches WHERE flow_name=$1", alertKeys.FlowName, + ).Scan(&intervalSinceLastNormalize); err != nil { logger.Warn("failed to get interval since last normalize", slog.Any("error", err)) } // what if the first normalize errors out/hangs? @@ -1244,12 +1243,9 @@ func (c *PostgresConnector) HandleSlotInfo( } func getOpenConnectionsForUser(ctx context.Context, conn *pgx.Conn, user string) (*protos.GetOpenConnectionsForUserResult, error) { - row := conn.QueryRow(ctx, getNumConnectionsForUser, user) - // COUNT() returns BIGINT var result pgtype.Int8 - err := row.Scan(&result) - if err != nil { + if err := conn.QueryRow(ctx, getNumConnectionsForUser, user).Scan(&result); err != nil { return nil, fmt.Errorf("error while reading result row: %w", err) } @@ -1260,12 +1256,9 @@ func getOpenConnectionsForUser(ctx context.Context, conn *pgx.Conn, user string) } func getOpenReplicationConnectionsForUser(ctx context.Context, conn *pgx.Conn, user string) (*protos.GetOpenConnectionsForUserResult, error) { - row := conn.QueryRow(ctx, getNumReplicationConnections, user) - // COUNT() returns BIGINT var result pgtype.Int8 - err := row.Scan(&result) - if err != nil { + if err := conn.QueryRow(ctx, getNumReplicationConnections, user).Scan(&result); err != nil { return nil, fmt.Errorf("error while reading result row: %w", err) } @@ -1483,8 +1476,7 @@ func (c *PostgresConnector) RemoveTableEntriesFromRawTable( func (c *PostgresConnector) GetVersion(ctx context.Context) (string, error) { var version string - err := c.conn.QueryRow(ctx, "SELECT version()").Scan(&version) - if err != nil { + if err := c.conn.QueryRow(ctx, "SELECT version()").Scan(&version); err != nil { return "", err } c.logger.Info("[postgres] version", slog.String("version", version)) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 2b5729f679..ca9665a317 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -145,13 +145,11 @@ func (c *PostgresConnector) CheckReplicationConnectivity(ctx context.Context) er func (c *PostgresConnector) CheckPublicationCreationPermissions(ctx context.Context, srcTableNames []string) error { pubName := "_peerdb_tmp_test_publication_" + shared.RandomString(5) - err := c.CreatePublication(ctx, srcTableNames, pubName) - if err != nil { + if err := c.CreatePublication(ctx, srcTableNames, pubName); err != nil { return err } - _, err = c.conn.Exec(ctx, "DROP PUBLICATION "+pubName) - if err != nil { + if _, err := c.conn.Exec(ctx, "DROP PUBLICATION "+pubName); err != nil { return fmt.Errorf("failed to drop publication: %v", err) } return nil diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 3cf1f97597..8b28573104 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -11,6 +11,7 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/connectors/clickhouse" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -460,11 +461,10 @@ func (s ClickHouseSuite) Test_Replident_Full_Unchanged_TOAST_Updates() { e2e.RequireEnvCanceled(s.t, env) } -// Replicate a table called "table" and a column with hyphen in it -func (s ClickHouseSuite) Test_Weird_Table_And_Column() { - srcTableName := "table" - srcFullName := s.attachSchemaSuffix("\"table\"") - dstTableName := "table" +func (s ClickHouseSuite) WeirdTable(tableName string) { + srcTableName := tableName + srcFullName := s.attachSchemaSuffix(fmt.Sprintf("\"%s\"", tableName)) + dstTableName := tableName _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -474,14 +474,12 @@ func (s ClickHouseSuite) Test_Weird_Table_And_Column() { `, srcFullName)) require.NoError(s.t, err) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key) VALUES ('init'); - `, srcFullName)) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s (key) VALUES ('init')", srcFullName)) require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("clickhouse_test_weird_table_and_column"), - TableNameMapping: map[string]string{s.attachSchemaSuffix("table"): dstTableName}, + FlowJobName: s.attachSuffix("clickhouse_test_weird_table_" + strings.ReplaceAll(tableName, "-", "_")), + TableNameMapping: map[string]string{s.attachSchemaSuffix(tableName): dstTableName}, Destination: s.Peer().Name, } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) @@ -492,15 +490,57 @@ func (s ClickHouseSuite) Test_Weird_Table_And_Column() { e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,key") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key) VALUES ('cdc'); - `, srcFullName)) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s (key) VALUES ('cdc')", srcFullName)) require.NoError(s.t, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,key") env.Cancel() e2e.RequireEnvCanceled(s.t, env) + + env = e2e.ExecuteWorkflow(tc, shared.PeerFlowTaskQueue, peerflow.DropFlowWorkflow, &protos.DropFlowInput{ + FlowJobName: flowConnConfig.FlowJobName, + DropFlowStats: false, + FlowConnectionConfigs: flowConnConfig, + }) + e2e.EnvWaitForFinished(s.t, env, 3*time.Minute) + // now test weird names with rename based resync + ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("DROP TABLE `%s`", dstTableName))) + require.NoError(s.t, ch.Close()) + flowConnConfig.Resync = true + env = e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,key") + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) + + env = e2e.ExecuteWorkflow(tc, shared.PeerFlowTaskQueue, peerflow.DropFlowWorkflow, &protos.DropFlowInput{ + FlowJobName: flowConnConfig.FlowJobName, + DropFlowStats: false, + FlowConnectionConfigs: flowConnConfig, + }) + e2e.EnvWaitForFinished(s.t, env, 3*time.Minute) + // now test weird names with exchange based resync + ch, err = connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("TRUNCATE TABLE `%s`", dstTableName))) + require.NoError(s.t, ch.Close()) + env = e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,key") + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} + +func (s ClickHouseSuite) Test_WeirdTable_Keyword() { + s.WeirdTable("table") +} + +func (s ClickHouseSuite) Test_WeirdTable_Dash() { + s.t.SkipNow() // TODO fix avro errors by sanitizing names + s.WeirdTable("table-group") } // large NUMERICs (precision >76) are mapped to String on CH, test diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index bcb0bf48f8..91c5817d40 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -18,18 +18,16 @@ import ( func cleanPostgres(conn *pgx.Conn, suffix string) error { // drop the e2e_test schema with the given suffix if it exists - _, err := conn.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS e2e_test_%s CASCADE", suffix)) - if err != nil { + if _, err := conn.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS e2e_test_%s CASCADE", suffix)); err != nil { return fmt.Errorf("failed to drop e2e_test schema: %w", err) } // drop all open slots with the given suffix - _, err = conn.Exec( + if _, err := conn.Exec( context.Background(), "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", "%_"+suffix, - ) - if err != nil { + ); err != nil { return fmt.Errorf("failed to drop replication slots: %w", err) } @@ -47,8 +45,7 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error { } for _, pubName := range publications { - _, err = conn.Exec(context.Background(), "DROP PUBLICATION "+pubName) - if err != nil { + if _, err := conn.Exec(context.Background(), "DROP PUBLICATION "+pubName); err != nil { return fmt.Errorf("failed to drop publication %s: %w", pubName, err) } } @@ -65,8 +62,7 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { } // create an e2e_test schema - _, err = setupTx.Exec(context.Background(), "SELECT pg_advisory_xact_lock(hashtext('Megaton Mile'))") - if err != nil { + if _, err := setupTx.Exec(context.Background(), "SELECT pg_advisory_xact_lock(hashtext('Megaton Mile'))"); err != nil { return fmt.Errorf("failed to get lock: %w", err) } defer func() { @@ -77,12 +73,11 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { }() // create an e2e_test schema - _, err = setupTx.Exec(context.Background(), "CREATE SCHEMA e2e_test_"+suffix) - if err != nil { + if _, err := setupTx.Exec(context.Background(), "CREATE SCHEMA e2e_test_"+suffix); err != nil { return fmt.Errorf("failed to create e2e_test schema: %w", err) } - _, err = setupTx.Exec(context.Background(), ` + if _, err := setupTx.Exec(context.Background(), ` CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); @@ -95,8 +90,7 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { LANGUAGE 'sql' VOLATILE SET search_path = 'pg_catalog'; - `) - if err != nil { + `); err != nil { return fmt.Errorf("failed to create utility functions: %w", err) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 9dadc49852..ce134f819a 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -195,10 +195,9 @@ func EnvWaitForCount( func RequireEnvCanceled(t *testing.T, env WorkflowRun) { t.Helper() EnvWaitForFinished(t, env, time.Minute) - err := env.Error() var panicErr *temporal.PanicError var canceledErr *temporal.CanceledError - if err == nil { + if err := env.Error(); err == nil { t.Fatal("Expected workflow to be canceled, not completed") } else if errors.As(err, &panicErr) { t.Fatalf("Workflow panic: %s %s", panicErr.Error(), panicErr.StackTrace()) @@ -217,8 +216,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, config *protos.FlowC response, err := env.Query(shared.FlowStatusQuery, config.FlowJobName) if err == nil { var status protos.FlowStatus - err = response.Get(&status) - if err != nil { + if err := response.Get(&status); err != nil { t.Fatal(err) } else if status == protos.FlowStatus_STATUS_RUNNING { return