Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 10, 2024
2 parents 7ceeecf + 5f904dd commit b6df941
Show file tree
Hide file tree
Showing 115 changed files with 4,339 additions and 1,761 deletions.
30 changes: 15 additions & 15 deletions dev-peerdb.sh
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
#!/bin/sh
if test -z "$USE_PODMAN"
set -Eeu

DOCKER="docker"
EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui"

if test -n "${USE_PODMAN:=}"
then
if ! command -v docker &> /dev/null
then
if command -v podman-compose
then
echo "docker could not be found on PATH, using podman-compose"
# 0 is found, checking for not found so we check for podman then
if $(docker compose &>/dev/null) && [ $? -ne 0 ]; then
if $(podman compose &>/dev/null) && [ $? -eq 0 ]; then
echo "docker could not be found on PATH, using podman compose"
USE_PODMAN=1
else
echo "docker could not be found on PATH"
echo "docker compose could not be found on PATH"
exit 1
fi
fi
fi

if test -z "$USE_PODMAN"
then
DOCKER="docker compose"
EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui"
else
DOCKER="podman-compose --podman-run-args=--replace"
EXTRA_ARGS=""
if test -n "$USE_PODMAN"; then
DOCKER="podman"
EXTRA_ARGS="--podman-run-args=--replace"
fi

export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD)
exec $DOCKER -f docker-compose-dev.yml up --build $EXTRA_ARGS
exec $DOCKER compose -f docker-compose-dev.yml up --build $EXTRA_ARGS
38 changes: 8 additions & 30 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
version: "3.9"

name: peerdb-quickstart-dev

x-minio-config: &minio-config
PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID: _peerdb_minioadmin
PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY: _peerdb_minioadmin
Expand Down Expand Up @@ -145,42 +147,14 @@ services:
temporal-admin-tools:
condition: service_healthy

flow-worker1:
container_name: flow-worker1
build:
context: .
dockerfile: stacks/flow.Dockerfile
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
depends_on:
temporal-admin-tools:
condition: service_healthy

flow-worker2:
container_name: flow-worker2
flow-worker:
container_name: flow-worker
build:
context: .
dockerfile: stacks/flow.Dockerfile
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
profiles:
- multi
depends_on:
temporal-admin-tools:
condition: service_healthy

flow-worker3:
container_name: flow-worker3
build:
context: .
dockerfile: stacks/flow.Dockerfile
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
profiles:
- multi
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down Expand Up @@ -244,3 +218,7 @@ services:
volumes:
pgdata:
minio-data:

networks:
default:
name: peerdb_network
33 changes: 8 additions & 25 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
version: "3.9"

name: peerdb-quickstart

x-minio-config: &minio-config
PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID: _peerdb_minioadmin
PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY: _peerdb_minioadmin
Expand Down Expand Up @@ -124,33 +126,11 @@ services:
temporal-admin-tools:
condition: service_healthy

flow-worker1:
container_name: flow-worker1
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
depends_on:
temporal-admin-tools:
condition: service_healthy

flow-worker2:
container_name: flow-worker2
flow-worker:
container_name: flow-worker
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
profiles:
- multi
depends_on:
temporal-admin-tools:
condition: service_healthy

flow-worker3:
container_name: flow-worker3
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
profiles:
- multi
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down Expand Up @@ -208,5 +188,8 @@ services:
volumes:
pgdata:
prometheusdata:
minio-data:

networks:
default:
name: peerdb_network
2 changes: 1 addition & 1 deletion e2e_cleanup/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peer-flow-cleanup

go 1.22.2
go 1.22.3

require (
cloud.google.com/go/bigquery v1.59.1
Expand Down
42 changes: 35 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand All @@ -44,6 +46,7 @@ type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
CdcCache map[string]CdcCacheEntry
OtelManager *otel_metrics.OtelManager
CdcCacheRw sync.RWMutex
}

Expand Down Expand Up @@ -151,7 +154,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetAs[connectors.NormalizedTablesConnector](ctx, config.PeerConnectionConfig)
if err != nil {
if err == connectors.ErrUnsupportedFunctionality {
if errors.Is(err, errors.ErrUnsupported) {
logger.Info("Connector does not implement normalized tables")
return nil, nil
}
Expand Down Expand Up @@ -295,7 +298,7 @@ func (a *FlowableActivity) StartNormalize(
logger := activity.GetLogger(ctx)

dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
if errors.Is(err, errors.ErrUnsupported) {
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID)
return nil, err
Expand Down Expand Up @@ -438,7 +441,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
if errors.Is(err, errors.ErrUnsupported) {
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
} else if err != nil {
return err
Expand All @@ -462,7 +465,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dst, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
if errors.Is(err, errors.ErrUnsupported) {
return nil
} else if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -575,7 +578,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
func() {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
if err != connectors.ErrUnsupportedFunctionality {
if !errors.Is(err, errors.ErrUnsupported) {
logger.Error("Failed to create connector to handle slot info", slog.Any("error", err))
}
return
Expand All @@ -592,7 +595,32 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
if ctx.Err() != nil {
return
}
err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)

var slotLagGauge *otel_metrics.Float64Gauge
var openConnectionsGauge *otel_metrics.Int64Gauge
if a.OtelManager != nil {
slotLagGauge, err = otel_metrics.GetOrInitFloat64Gauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
"cdc_slot_lag",
metric.WithUnit("MB"),
metric.WithDescription("Postgres replication slot lag in MB"))
if err != nil {
logger.Error("Failed to get slot lag gauge", slog.Any("error", err))
return
}

openConnectionsGauge, err = otel_metrics.GetOrInitInt64Gauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
"open_connections",
metric.WithDescription("Current open connections for PeerDB user"))
if err != nil {
logger.Error("Failed to get open connections gauge", slog.Any("error", err))
return
}
}

err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName,
slotLagGauge, openConnectionsGauge)
if err != nil {
logger.Error("Failed to handle slot info", slog.Any("error", err))
}
Expand All @@ -615,7 +643,7 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
if config.SourcePeer.Type != protos.DBType_POSTGRES {
return QRepWaitUntilNewRowsResult{Found: true}, nil
}

Expand Down
38 changes: 28 additions & 10 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/yuin/gopher-lua"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -187,13 +189,14 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

syncStartTime = time.Now()
res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{
SyncBatchID: syncBatchID,
Records: recordBatch,
ConsumedOffset: &consumedOffset,
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
SyncBatchID: syncBatchID,
Records: recordBatch,
ConsumedOffset: &consumedOffset,
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
TableNameSchemaMapping: options.TableNameSchemaMapping,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down Expand Up @@ -342,10 +345,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})
defer shutdown()

var rowsSynced int
bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)
stream := model.NewQRecordStream(bufferSize)
outstream := stream
if config.Script != "" {
ls, err := utils.LoadScript(ctx, config.Script, utils.LuaPrintFn(func(s string) {
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s)
}))
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
lfn := ls.Env.RawGetString("transformRow")
if fn, ok := lfn.(*lua.LFunction); ok {
outstream = pua.AttachToStream(ls, fn, stream)
}
}

var rowsSynced int
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
tmp, err := srcConn.PullQRepRecords(errCtx, config, partition, stream)
if err != nil {
Expand All @@ -362,7 +380,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, stream)
rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, outstream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
if args.TemporalCert != "" && args.TemporalKey != "" {
slog.Info("Using temporal certificate/key for authentication")

certs, err := Base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey)
certs, err := base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey)
if err != nil {
return fmt.Errorf("unable to base64 decode certificate and key: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/cert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
)

func Base64DecodeCertAndKey(cert string, key string) ([]tls.Certificate, error) {
func base64DecodeCertAndKey(cert string, key string) ([]tls.Certificate, error) {
temporalCert := strings.TrimSpace(cert)
certBytes, err := base64.StdEncoding.DecodeString(temporalCert)
if err != nil {
Expand Down
Loading

0 comments on commit b6df941

Please sign in to comment.