From 928ee3cf4a26b86fecd7964d7dbba6f91b0105c0 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 4 Dec 2024 02:53:09 +0530 Subject: [PATCH] remove/refactor functions from shared (#2307) --- flow/connectors/eventhub/hubmanager.go | 10 ++++---- .../postgres/qrep_partition_test.go | 8 +++---- flow/connectors/postgres/sink_q.go | 14 ++++------- .../snowflake/qrep_avro_consolidate.go | 12 ++++------ flow/connectors/utils/azure.go | 15 ------------ flow/e2e/bigquery/bigquery_helper.go | 8 +++---- flow/e2e/snowflake/snowflake_helper.go | 9 +++---- flow/e2e/sqlserver/sqlserver_helper.go | 9 +++---- flow/shared/random.go | 24 ------------------- flow/shared/{worklow.go => workflow.go} | 0 10 files changed, 27 insertions(+), 82 deletions(-) delete mode 100644 flow/connectors/utils/azure.go rename flow/shared/{worklow.go => workflow.go} (100%) diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 3e134968dc..5515eae880 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -14,8 +14,8 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub" cmap "github.com/orcaman/concurrent-map/v2" - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -186,10 +186,10 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE func (m *EventHubManager) getEventHubMgmtClient(subID string) (*armeventhub.EventHubsClient, error) { if subID == "" { - envSubID, err := utils.GetAzureSubscriptionID() - if err != nil { - slog.Error("failed to get azure subscription id", slog.Any("error", err)) - return nil, err + envSubID := peerdbenv.GetEnvString("AZURE_SUBSCRIPTION_ID", "") + if envSubID == "" { + slog.Error("couldn't find AZURE_SUBSCRIPTION_ID in environment") + return nil, errors.New("couldn't find AZURE_SUBSCRIPTION_ID in environment") } subID = envSubID } diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index 0249b75fc1..a81df27698 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "math/rand/v2" "testing" "time" @@ -84,11 +85,8 @@ func TestGetQRepPartitions(t *testing.T) { } defer conn.Close(context.Background()) - // Generate a random schema name - rndUint, err := shared.RandomUInt64() - if err != nil { - t.Fatalf("Failed to generate random uint: %v", err) - } + //nolint:gosec // Generate a random schema name, number has no cryptographic significance + rndUint := rand.Uint64() schemaName := fmt.Sprintf("test_%d", rndUint) // Create the schema diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index 89dab6a94f..21a39627be 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "math/rand/v2" "github.com/jackc/pgx/v5" @@ -35,20 +36,15 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( } } - randomUint, err := shared.RandomUInt64() - if err != nil { - qe.logger.Error("[pg_query_executor] failed to generate random uint", slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err) - stream.Close(err) - return 0, err - } + //nolint:gosec // number has no cryptographic significance + randomUint := rand.Uint64() cursorName := fmt.Sprintf("peerdb_cursor_%d", randomUint) fetchSize := shared.FetchAndChannelSize cursorQuery := fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursorName, query) qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args)) - _, err = tx.Exec(ctx, cursorQuery, args...) - if err != nil { + + if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil { qe.logger.Info("[pg_query_executor] failed to declare cursor", slog.String("cursorQuery", cursorQuery), slog.Any("error", err)) err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err) diff --git a/flow/connectors/snowflake/qrep_avro_consolidate.go b/flow/connectors/snowflake/qrep_avro_consolidate.go index 547aef27ef..a4a8d1a285 100644 --- a/flow/connectors/snowflake/qrep_avro_consolidate.go +++ b/flow/connectors/snowflake/qrep_avro_consolidate.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "log/slog" + "math/rand/v2" "strings" "time" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/peerdbenv" - "github.com/PeerDB-io/peer-flow/shared" ) type SnowflakeAvroConsolidateHandler struct { @@ -214,10 +214,8 @@ func (s *SnowflakeAvroConsolidateHandler) generateUpsertMergeCommand( // handleUpsertMode handles the upsert mode func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context) error { - runID, err := shared.RandomUInt64() - if err != nil { - return fmt.Errorf("failed to generate run ID: %w", err) - } + //nolint:gosec // number has no cryptographic significance + runID := rand.Uint64() tempTableName := fmt.Sprintf("%s_temp_%d", s.dstTableName, runID) @@ -230,8 +228,8 @@ func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context) s.connector.logger.Info("created temp table " + tempTableName) copyCmd := s.getCopyTransformation(tempTableName) - _, err = s.connector.database.ExecContext(ctx, copyCmd) - if err != nil { + + if _, err := s.connector.database.ExecContext(ctx, copyCmd); err != nil { return fmt.Errorf("failed to run COPY INTO command: %w", err) } s.connector.logger.Info("copied file from stage " + s.stage + " to temp table " + tempTableName) diff --git a/flow/connectors/utils/azure.go b/flow/connectors/utils/azure.go deleted file mode 100644 index df612b47d3..0000000000 --- a/flow/connectors/utils/azure.go +++ /dev/null @@ -1,15 +0,0 @@ -package utils - -import ( - "errors" - "os" -) - -func GetAzureSubscriptionID() (string, error) { - // get this from env - id := os.Getenv("AZURE_SUBSCRIPTION_ID") - if id == "" { - return "", errors.New("AZURE_SUBSCRIPTION_ID is not set") - } - return id, nil -} diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 1ee303acf8..ee33f2bfc3 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/big" + "math/rand/v2" "os" "strings" "testing" @@ -21,7 +22,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type BigQueryTestHelper struct { @@ -37,10 +37,8 @@ type BigQueryTestHelper struct { func NewBigQueryTestHelper(t *testing.T) (*BigQueryTestHelper, error) { t.Helper() // random 64 bit int to namespace stateful schemas. - runID, err := shared.RandomUInt64() - if err != nil { - return nil, fmt.Errorf("failed to generate random uint64: %w", err) - } + //nolint:gosec // number has no cryptographic significance + runID := rand.Uint64() jsonPath := os.Getenv("TEST_BQ_CREDS") if jsonPath == "" { diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index ca57b5b473..7e2943e3bc 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math/rand/v2" "os" "testing" @@ -13,7 +14,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type SnowflakeTestHelper struct { @@ -47,11 +47,8 @@ func NewSnowflakeTestHelper(t *testing.T) (*SnowflakeTestHelper, error) { return nil, fmt.Errorf("failed to unmarshal json: %w", err) } - runID, err := shared.RandomUInt64() - if err != nil { - return nil, fmt.Errorf("failed to generate random uint64: %w", err) - } - + //nolint:gosec // number has no cryptographic significance + runID := rand.Uint64() testDatabaseName := fmt.Sprintf("e2e_test_%d", runID) adminClient, err := connsnowflake.NewSnowflakeClient(context.Background(), config) diff --git a/flow/e2e/sqlserver/sqlserver_helper.go b/flow/e2e/sqlserver/sqlserver_helper.go index 056922800c..d3e1401f24 100644 --- a/flow/e2e/sqlserver/sqlserver_helper.go +++ b/flow/e2e/sqlserver/sqlserver_helper.go @@ -3,6 +3,7 @@ package e2e_sqlserver import ( "context" "fmt" + "math/rand/v2" "os" "strconv" @@ -10,7 +11,6 @@ import ( connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type SQLServerHelper struct { @@ -45,11 +45,8 @@ func NewSQLServerHelper() (*SQLServerHelper, error) { return nil, fmt.Errorf("invalid connection configs: %v", connErr) } - rndNum, err := shared.RandomUInt64() - if err != nil { - return nil, err - } - + //nolint:gosec // number has no cryptographic significance + rndNum := rand.Uint64() testSchema := fmt.Sprintf("e2e_test_%d", rndNum) if err := connector.CreateSchema(context.Background(), testSchema); err != nil { return nil, err diff --git a/flow/shared/random.go b/flow/shared/random.go index 7ef3c8e5dc..84830f3762 100644 --- a/flow/shared/random.go +++ b/flow/shared/random.go @@ -2,32 +2,8 @@ package shared import ( "crypto/rand" - "encoding/binary" - "errors" ) -// RandomInt64 returns a random 64 bit integer. -func RandomInt64() (int64, error) { - b := make([]byte, 8) - _, err := rand.Read(b) - if err != nil { - return 0, errors.New("could not generate random int64: " + err.Error()) - } - // Convert bytes to int64 - return int64(binary.LittleEndian.Uint64(b)), nil -} - -// RandomUInt64 returns a random 64 bit unsigned integer. -func RandomUInt64() (uint64, error) { - b := make([]byte, 8) - _, err := rand.Read(b) - if err != nil { - return 0, errors.New("could not generate random uint64: " + err.Error()) - } - // Convert bytes to uint64 - return binary.LittleEndian.Uint64(b), nil -} - func RandomString(n int) string { const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" bytes := make([]byte, n) diff --git a/flow/shared/worklow.go b/flow/shared/workflow.go similarity index 100% rename from flow/shared/worklow.go rename to flow/shared/workflow.go