Skip to content

Commit

Permalink
Merge branch 'main' into fix-go-mod
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Nov 29, 2024
2 parents 2134580 + 442a4b9 commit 49392b0
Show file tree
Hide file tree
Showing 36 changed files with 799 additions and 608 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3
uses: github/codeql-action/init@f09c1c0a94de965c15400f5634aa42fac8fb8f88 # v3
with:
languages: ${{ matrix.language }}
build-mode: ${{ matrix.build-mode }}

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3
uses: github/codeql-action/analyze@f09c1c0a94de965c15400f5634aa42fac8fb8f88 # v3
with:
category: "/language:${{matrix.language}}"
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
elasticsearch:
image: elasticsearch:8.16.0@sha256:a411f7c17549209c5839b69f929de00bd91f1e2dcf08b65d5f41b122eae17f5e
image: elasticsearch:8.16.1@sha256:e5ee5f8dacbf18fa3ab59a098cc7d4d69f73e61637eb45f1c029e74b1cb200a1
ports:
- 9200:9200
env:
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ PeerDB is an ETL/ELT tool built for PostgreSQL. We implement multiple Postgres n

**From a feature richness standpoint**, we support efficient syncing of tables with large (TOAST) columns. We support multiple streaming modes - Log based (CDC) based, Query based streaming etc. We provide rich data-type mapping and plan to support every possible (incl. Custom types) that Postgres supports to the best extent possible on the target data-store.

### Now available natively in ClickHouse Cloud (Private Preview)

PeerDB is now available natively in ClickHouse Cloud (Private Preview). Learn more about it [here](https://clickhouse.com/cloud/clickpipes/postgres-cdc-connector).

<a href="https://clickhouse.com/cloud/clickpipes/postgres-cdc-connector">
<img src="images/in-clickpipes.png" width="512" />
</a>

#### **Postgres-compatible SQL interface to do ETL**

The Postgres-compatible SQL interface for ETL is unique to PeerDB and enables you to operate in a language you are familiar with. You can do ETL the same way you work with your databases.
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: postgres:17-alpine@sha256:0d9624535618a135c5453258fd629f4963390338b11aaffb92292c12df3a6c17
image: postgres:17-alpine@sha256:e7897baa70dae1968d23d785adb4aeb699175e0bcaae44f98a7083ecb9668b93
command: -c config_file=/etc/postgresql.conf
ports:
- 9901:5432
Expand Down
12 changes: 6 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: postgres:17-alpine@sha256:0d9624535618a135c5453258fd629f4963390338b11aaffb92292c12df3a6c17
image: postgres:17-alpine@sha256:e7897baa70dae1968d23d785adb4aeb699175e0bcaae44f98a7083ecb9668b93
command: -c config_file=/etc/postgresql.conf
restart: unless-stopped
ports:
Expand Down Expand Up @@ -112,7 +112,7 @@ services:

flow-api:
container_name: flow_api
image: ghcr.io/peerdb-io/flow-api:latest-stable@sha256:53a6de3d7537b4a90b4ff13d822d0a9fa3015857fc739fc2497d33f33b05dfaa
image: ghcr.io/peerdb-io/flow-api:stable-v0.19.1@sha256:a759b2d1b14f11d09ade672c268abcb456fd8884468547ea0f467cdfb60a0994
restart: unless-stopped
ports:
- 8112:8112
Expand All @@ -128,7 +128,7 @@ services:

flow-snapshot-worker:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-stable@sha256:036d6091e32c9d15f2738bc6aab312aa1f412f5c06c57687b497cde233b73d4c
image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.19.1@sha256:894c1fea1cf9a4f5622420d8630509243b60cf177e107ec4d14d7294a9490451
restart: unless-stopped
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
Expand All @@ -138,7 +138,7 @@ services:

flow-worker:
container_name: flow-worker
image: ghcr.io/peerdb-io/flow-worker:latest-stable@sha256:f5d4d5e4e44336d6917e3c8b3d753c77d813d5d1e55ca7fb4d3a3d3d1d3253cc
image: ghcr.io/peerdb-io/flow-worker:stable-v0.19.1@sha256:4482314bd3bd4a96930fbee10c00a9f2d5764e86cfd8802642589d339cf04054
restart: unless-stopped
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
Expand All @@ -151,7 +151,7 @@ services:
peerdb:
container_name: peerdb-server
stop_signal: SIGINT
image: ghcr.io/peerdb-io/peerdb-server:latest-stable@sha256:15249fc45b8b5384fb7a046bc73f75cc679c570a3d2fd3fd8c40c7d7e85f7eef
image: ghcr.io/peerdb-io/peerdb-server:stable-v0.19.1@sha256:c736500e0b42f100df29af43ecf4c96d0c8f4805dd294fecd0bb4ce7b7897a18
restart: unless-stopped
environment:
<<: *catalog-config
Expand All @@ -167,7 +167,7 @@ services:

peerdb-ui:
container_name: peerdb-ui
image: ghcr.io/peerdb-io/peerdb-ui:latest-stable@sha256:f4d1cdf966eb06f4a4a03db4b02593b44c8a37bd32143c937d3c59c2586c4bb1
image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.19.1@sha256:ffc4b5960dc1653a59e680c61fca0ba2c5891cb4965e4662927d9886f4d7f6bc
restart: unless-stopped
ports:
- 3000:3000
Expand Down
33 changes: 15 additions & 18 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -287,11 +286,13 @@ func (a *FlowableActivity) MaintainPull(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
defer connectors.CloseConnector(ctx, srcConn)

if err := srcConn.SetupReplConn(ctx); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}

Expand Down Expand Up @@ -407,7 +408,7 @@ func (a *FlowableActivity) StartNormalize(
if errors.Is(err, errors.ErrUnsupported) {
return nil, monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID)
} else if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get normalize connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

Expand All @@ -418,7 +419,7 @@ func (a *FlowableActivity) StartNormalize(

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get table name schema mapping: %w", err)
}

res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
Expand All @@ -436,13 +437,13 @@ func (a *FlowableActivity) StartNormalize(
}
dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get peer type: %w", err)
}
if dstType == protos.DBType_POSTGRES {
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to update end time for cdc batch: %w", err)
}
}

Expand Down Expand Up @@ -757,11 +758,10 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
return
}

slotMetricGauges := peerdb_gauges.SlotMetricGauges{}
slotMetricGauges := otel_metrics.SlotMetricGauges{}
if a.OtelManager != nil {
slotLagGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.SlotLagGaugeName),
slotLagGauge, err := a.OtelManager.GetOrInitFloat64Gauge(
otel_metrics.BuildMetricName(otel_metrics.SlotLagGaugeName),
metric.WithUnit("MiBy"),
metric.WithDescription("Postgres replication slot lag in MB"))
if err != nil {
Expand All @@ -770,29 +770,26 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
}
slotMetricGauges.SlotLagGauge = slotLagGauge

openConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenConnectionsGaugeName),
openConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.OpenConnectionsGaugeName),
metric.WithDescription("Current open connections for PeerDB user"))
if err != nil {
logger.Error("Failed to get open connections gauge", slog.Any("error", err))
return
}
slotMetricGauges.OpenConnectionsGauge = openConnectionsGauge

openReplicationConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenReplicationConnectionsGaugeName),
openReplicationConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.OpenReplicationConnectionsGaugeName),
metric.WithDescription("Current open replication connections for PeerDB user"))
if err != nil {
logger.Error("Failed to get open replication connections gauge", slog.Any("error", err))
return
}
slotMetricGauges.OpenReplicationConnectionsGauge = openReplicationConnectionsGauge

intervalSinceLastNormalizeGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.IntervalSinceLastNormalizeGaugeName),
intervalSinceLastNormalizeGauge, err := a.OtelManager.GetOrInitFloat64Gauge(
otel_metrics.BuildMetricName(otel_metrics.IntervalSinceLastNormalizeGaugeName),
metric.WithUnit("s"),
metric.WithDescription("Interval since last normalize"))
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -113,7 +114,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
options *protos.SyncFlowOptions,
sessionID string,
adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error),
pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[Items]) error,
pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error),
) (*model.SyncCompositeResponse, error) {
flowName := config.FlowJobName
Expand Down Expand Up @@ -181,7 +182,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
startTime := time.Now()
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[Items]{
return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
Expand Down
22 changes: 19 additions & 3 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
"github.com/PeerDB-io/peer-flow/tags"
)

// alerting service, no cool name :(
Expand Down Expand Up @@ -356,7 +357,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i
return true
}

logger.Info(fmt.Sprintf("Skipped sending alerts: last alert was sent at %s, which was >=%s ago", createdTimestamp.String(), dur.String()))
logger.Info(fmt.Sprintf("Skipped sending alerts: last alert was sent at %s, which was <=%s ago", createdTimestamp.String(), dur.String()))
return false
}

Expand All @@ -366,13 +367,24 @@ func (a *Alerter) sendTelemetryMessage(
flowName string,
more string,
level telemetry.Level,
tags ...string,
additionalTags ...string,
) {
allTags := []string{flowName, peerdbenv.PeerDBDeploymentUID()}
allTags = append(allTags, additionalTags...)

if flowTags, err := tags.GetTags(ctx, a.CatalogPool, flowName); err != nil {
logger.Warn("failed to get flow tags", slog.Any("error", err))
} else {
for key, value := range flowTags {
allTags = append(allTags, fmt.Sprintf("%s:%s", key, value))
}
}

details := fmt.Sprintf("[%s] %s", flowName, more)
attributes := telemetry.Attributes{
Level: level,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: append([]string{flowName, peerdbenv.PeerDBDeploymentUID()}, tags...),
Tags: allTags,
Type: flowName,
}

Expand Down Expand Up @@ -440,6 +452,10 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
if errors.As(err, &pgErr) {
tags = append(tags, "pgcode:"+pgErr.Code)
}
var netErr *net.OpError
if errors.As(err, &netErr) {
tags = append(tags, "err:Net")
}
a.sendTelemetryMessage(ctx, logger, flowName, errorWithStack, telemetry.ERROR, tags...)
}

Expand Down
84 changes: 84 additions & 0 deletions flow/cmd/tags_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cmd

import (
"context"
"fmt"
"log/slog"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/tags"
)

func (h *FlowRequestHandler) flowExists(ctx context.Context, flowName string) (bool, error) {
var exists bool
err := h.pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)", flowName).Scan(&exists)
if err != nil {
slog.Error("error checking if flow exists", slog.Any("error", err))
return false, err
}

slog.Info(fmt.Sprintf("flow %s exists: %t", flowName, exists))
return exists, nil
}

func (h *FlowRequestHandler) CreateOrReplaceFlowTags(
ctx context.Context,
in *protos.CreateOrReplaceFlowTagsRequest,
) (*protos.CreateOrReplaceFlowTagsResponse, error) {
flowName := in.FlowName

exists, err := h.flowExists(ctx, flowName)
if err != nil {
return nil, err
}

if !exists {
slog.Error("flow does not exist", slog.String("flow_name", flowName))
return nil, fmt.Errorf("flow %s does not exist", flowName)
}

tags := make(map[string]string, len(in.Tags))
for _, tag := range in.Tags {
tags[tag.Key] = tag.Value
}

_, err = h.pool.Exec(ctx, "UPDATE flows SET tags = $1 WHERE name = $2", tags, flowName)
if err != nil {
slog.Error("error updating flow tags", slog.Any("error", err))
return nil, err
}

return &protos.CreateOrReplaceFlowTagsResponse{
FlowName: flowName,
}, nil
}

func (h *FlowRequestHandler) GetFlowTags(ctx context.Context, in *protos.GetFlowTagsRequest) (*protos.GetFlowTagsResponse, error) {
flowName := in.FlowName

exists, err := h.flowExists(ctx, flowName)
if err != nil {
return nil, err
}

if !exists {
slog.Error("flow does not exist", slog.String("flow_name", flowName))
return nil, fmt.Errorf("flow %s does not exist", flowName)
}

tags, err := tags.GetTags(ctx, h.pool, flowName)
if err != nil {
slog.Error("error getting flow tags", slog.Any("error", err))
return nil, err
}

protosTags := make([]*protos.FlowTag, 0, len(tags))
for key, value := range tags {
protosTags = append(protosTags, &protos.FlowTag{Key: key, Value: value})
}

return &protos.GetFlowTagsResponse{
FlowName: flowName,
Tags: protosTags,
}, nil
}
Loading

0 comments on commit 49392b0

Please sign in to comment.