From 3cf0e97430b3b3c2633669c750015720e9a76b72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 22 Dec 2023 05:07:38 +0000 Subject: [PATCH] Redirect workflow logs to testing logs --- flow/e2e/bigquery/peer_flow_bq_test.go | 32 ++++++++-------- flow/e2e/bigquery/qrep_flow_bq_test.go | 4 +- flow/e2e/postgres/peer_flow_pg_test.go | 12 +++--- flow/e2e/postgres/qrep_flow_pg_test.go | 4 +- flow/e2e/s3/cdc_s3_test.go | 24 ++++++------ flow/e2e/s3/qrep_flow_s3_test.go | 4 +- flow/e2e/snowflake/peer_flow_sf_test.go | 38 +++++++++---------- flow/e2e/snowflake/qrep_flow_sf_test.go | 12 +++--- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 2 +- flow/e2e/test_utils.go | 14 +++++-- 10 files changed, 76 insertions(+), 70 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 0aa2266437..be57f6a376 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -145,7 +145,7 @@ func (s PeerFlowE2ETestSuiteBQ) TearDownSuite() { } func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. @@ -167,7 +167,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_no_data") @@ -211,7 +211,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_char_coltype") @@ -258,7 +258,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") @@ -325,7 +325,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") @@ -393,7 +393,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") @@ -457,7 +457,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") @@ -531,7 +531,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") @@ -598,7 +598,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") @@ -665,7 +665,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_types_bq") @@ -744,7 +744,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -806,7 +806,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { // TODO: not checking schema exactly, add later func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -906,7 +906,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -979,7 +979,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1056,7 +1056,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1129,7 +1129,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 36e53330e9..fab2067717 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -47,7 +47,7 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { } func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 30be647222..d3cdb9337e 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -42,7 +42,7 @@ func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, ro } func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow") @@ -105,7 +105,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { } func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -266,7 +266,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -341,7 +341,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -422,7 +422,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -499,7 +499,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 422f24a160..74f1478955 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -165,7 +165,7 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { } func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -217,7 +217,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { } func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index c47e4bb56a..e9873656a9 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -19,7 +19,7 @@ func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { } func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) helper, setupErr := setupS3("s3") @@ -62,9 +62,8 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) + _, err = s.pool.Exec(context.Background(), + fmt.Sprintf("INSERT INTO %s (key, value) VALUES ($1, $2)", srcTableName), testKey, testValue) require.NoError(s.t, err) } require.NoError(s.t, err) @@ -82,9 +81,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - fmt.Println("JobName: ", flowJobName) + s.t.Log("JobName", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) + s.t.Log("Files in Test_Complete_Simple_Flow_S3: ", len(files)) require.NoError(s.t, err) require.Equal(s.t, 4, len(files)) @@ -93,8 +92,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) + helper, setupErr := setupS3("gcs") if setupErr != nil { require.Fail(s.t, "failed to setup S3", setupErr) @@ -135,12 +135,10 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) + _, err = s.pool.Exec(context.Background(), + fmt.Sprintf("INSERT INTO %s (key, value) VALUES ($1, $2)", srcTableName), testKey, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 20 rows into the source table") require.NoError(s.t, err) }() @@ -156,9 +154,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - fmt.Println("JobName: ", flowJobName) + s.t.Log("JobName", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files)) + s.t.Log("Files in Test_Complete_Simple_Flow_GCS: ", len(files)) require.NoError(s.t, err) require.Equal(s.t, 4, len(files)) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index b59d064334..e46e0f3005 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -88,7 +88,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { s.t.Skip("Skipping S3 test") } - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) jobName := "test_complete_flow_s3" @@ -137,7 +137,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { s.t.Skip("Skipping S3 test") } - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) jobName := "test_complete_flow_s3_ctid" diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 32711cabe7..29d51d6337 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -112,7 +112,7 @@ func (s PeerFlowE2ETestSuiteSF) TearDownSuite() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") @@ -187,7 +187,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_replica_identity_no_pkey") @@ -253,7 +253,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { } func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") @@ -338,7 +338,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") @@ -405,7 +405,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") @@ -474,7 +474,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") @@ -547,7 +547,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") @@ -613,7 +613,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") @@ -679,7 +679,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_types_sf") @@ -758,7 +758,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTable1Name := s.attachSchemaSuffix("test1_sf") @@ -817,7 +817,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -977,7 +977,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -1050,7 +1050,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1126,7 +1126,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1198,7 +1198,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_exclude_sf") @@ -1281,7 +1281,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1368,7 +1368,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1451,7 +1451,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1538,7 +1538,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_softdel_iad") diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 6a84a5d846..86ade072b8 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -56,7 +56,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selecto } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -97,7 +97,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -142,7 +142,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -184,7 +184,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -230,7 +230,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -276,7 +276,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() } func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 6f6458aa46..b10ced83eb 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -140,7 +140,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( s.t.Skip("Skipping SQL Server test") } - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 47f9b84bad..9d2c5b01ba 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log/slog" - "os" "strings" "testing" "time" @@ -362,12 +361,21 @@ func GetOwnersSelectorString() string { return strings.Join(fields, ",") } -func NewTemporalTestWorkflowEnvironment() *testsuite.TestWorkflowEnvironment { +type tlogWriter struct { + t *testing.T +} + +func (iw tlogWriter) Write(p []byte) (n int, err error) { + iw.t.Log(string(p)) + return len(p), nil +} + +func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnvironment { testSuite := &testsuite.WorkflowTestSuite{} logger := slog.New(logger.NewHandler( slog.NewJSONHandler( - os.Stdout, + tlogWriter{t}, &slog.HandlerOptions{Level: slog.LevelWarn}, ))) tLogger := NewTStructuredLogger(*logger)