Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove/refactor functions from shared #2307

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
cmap "github.com/orcaman/concurrent-map/v2"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -186,10 +186,10 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE

func (m *EventHubManager) getEventHubMgmtClient(subID string) (*armeventhub.EventHubsClient, error) {
if subID == "" {
envSubID, err := utils.GetAzureSubscriptionID()
if err != nil {
slog.Error("failed to get azure subscription id", slog.Any("error", err))
return nil, err
envSubID := peerdbenv.GetEnvString("AZURE_SUBSCRIPTION_ID", "")
if envSubID == "" {
slog.Error("couldn't find AZURE_SUBSCRIPTION_ID in environment")
return nil, errors.New("couldn't find AZURE_SUBSCRIPTION_ID in environment")
}
subID = envSubID
}
Expand Down
8 changes: 3 additions & 5 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"math/rand/v2"
"testing"
"time"

Expand Down Expand Up @@ -84,11 +85,8 @@ func TestGetQRepPartitions(t *testing.T) {
}
defer conn.Close(context.Background())

// Generate a random schema name
rndUint, err := shared.RandomUInt64()
if err != nil {
t.Fatalf("Failed to generate random uint: %v", err)
}
//nolint:gosec // Generate a random schema name, number has no cryptographic significance
rndUint := rand.Uint64()
schemaName := fmt.Sprintf("test_%d", rndUint)

// Create the schema
Expand Down
14 changes: 5 additions & 9 deletions flow/connectors/postgres/sink_q.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"math/rand/v2"

"github.com/jackc/pgx/v5"

Expand Down Expand Up @@ -35,20 +36,15 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
}
}

randomUint, err := shared.RandomUInt64()
if err != nil {
qe.logger.Error("[pg_query_executor] failed to generate random uint", slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err)
stream.Close(err)
return 0, err
}
//nolint:gosec // number has no cryptographic significance
randomUint := rand.Uint64()

cursorName := fmt.Sprintf("peerdb_cursor_%d", randomUint)
fetchSize := shared.FetchAndChannelSize
cursorQuery := fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursorName, query)
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args))
_, err = tx.Exec(ctx, cursorQuery, args...)
if err != nil {

if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil {
qe.logger.Info("[pg_query_executor] failed to declare cursor",
slog.String("cursorQuery", cursorQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
Expand Down
12 changes: 5 additions & 7 deletions flow/connectors/snowflake/qrep_avro_consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"fmt"
"log/slog"
"math/rand/v2"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

type SnowflakeAvroConsolidateHandler struct {
Expand Down Expand Up @@ -214,10 +214,8 @@ func (s *SnowflakeAvroConsolidateHandler) generateUpsertMergeCommand(

// handleUpsertMode handles the upsert mode
func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context) error {
runID, err := shared.RandomUInt64()
if err != nil {
return fmt.Errorf("failed to generate run ID: %w", err)
}
//nolint:gosec // number has no cryptographic significance
runID := rand.Uint64()

tempTableName := fmt.Sprintf("%s_temp_%d", s.dstTableName, runID)

Expand All @@ -230,8 +228,8 @@ func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context)
s.connector.logger.Info("created temp table " + tempTableName)

copyCmd := s.getCopyTransformation(tempTableName)
_, err = s.connector.database.ExecContext(ctx, copyCmd)
if err != nil {

if _, err := s.connector.database.ExecContext(ctx, copyCmd); err != nil {
return fmt.Errorf("failed to run COPY INTO command: %w", err)
}
s.connector.logger.Info("copied file from stage " + s.stage + " to temp table " + tempTableName)
Expand Down
15 changes: 0 additions & 15 deletions flow/connectors/utils/azure.go

This file was deleted.

8 changes: 3 additions & 5 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"math/rand/v2"
"os"
"strings"
"testing"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type BigQueryTestHelper struct {
Expand All @@ -37,10 +37,8 @@ type BigQueryTestHelper struct {
func NewBigQueryTestHelper(t *testing.T) (*BigQueryTestHelper, error) {
t.Helper()
// random 64 bit int to namespace stateful schemas.
runID, err := shared.RandomUInt64()
if err != nil {
return nil, fmt.Errorf("failed to generate random uint64: %w", err)
}
//nolint:gosec // number has no cryptographic significance
runID := rand.Uint64()

jsonPath := os.Getenv("TEST_BQ_CREDS")
if jsonPath == "" {
Expand Down
9 changes: 3 additions & 6 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"os"
"testing"

Expand All @@ -13,7 +14,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type SnowflakeTestHelper struct {
Expand Down Expand Up @@ -47,11 +47,8 @@ func NewSnowflakeTestHelper(t *testing.T) (*SnowflakeTestHelper, error) {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
}

runID, err := shared.RandomUInt64()
if err != nil {
return nil, fmt.Errorf("failed to generate random uint64: %w", err)
}

//nolint:gosec // number has no cryptographic significance
runID := rand.Uint64()
testDatabaseName := fmt.Sprintf("e2e_test_%d", runID)

adminClient, err := connsnowflake.NewSnowflakeClient(context.Background(), config)
Expand Down
9 changes: 3 additions & 6 deletions flow/e2e/sqlserver/sqlserver_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package e2e_sqlserver
import (
"context"
"fmt"
"math/rand/v2"
"os"
"strconv"

peersql "github.com/PeerDB-io/peer-flow/connectors/sql"
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type SQLServerHelper struct {
Expand Down Expand Up @@ -45,11 +45,8 @@ func NewSQLServerHelper() (*SQLServerHelper, error) {
return nil, fmt.Errorf("invalid connection configs: %v", connErr)
}

rndNum, err := shared.RandomUInt64()
if err != nil {
return nil, err
}

//nolint:gosec // number has no cryptographic significance
rndNum := rand.Uint64()
testSchema := fmt.Sprintf("e2e_test_%d", rndNum)
if err := connector.CreateSchema(context.Background(), testSchema); err != nil {
return nil, err
Expand Down
24 changes: 0 additions & 24 deletions flow/shared/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,8 @@ package shared

import (
"crypto/rand"
"encoding/binary"
"errors"
)

// RandomInt64 returns a random 64 bit integer.
func RandomInt64() (int64, error) {
b := make([]byte, 8)
_, err := rand.Read(b)
if err != nil {
return 0, errors.New("could not generate random int64: " + err.Error())
}
// Convert bytes to int64
return int64(binary.LittleEndian.Uint64(b)), nil
}

// RandomUInt64 returns a random 64 bit unsigned integer.
func RandomUInt64() (uint64, error) {
b := make([]byte, 8)
_, err := rand.Read(b)
if err != nil {
return 0, errors.New("could not generate random uint64: " + err.Error())
}
// Convert bytes to uint64
return binary.LittleEndian.Uint64(b), nil
}

func RandomString(n int) string {
const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
bytes := make([]byte, n)
Expand Down
File renamed without changes.
Loading