Skip to content

Commit

Permalink
kafka + lua scripting + flatbuffers (#1461)
Browse files Browse the repository at this point in the history
In theory kafka could be merged without lua,
instead having some default json encoding,
but that seems a bit pointless
  • Loading branch information
serprex authored Mar 21, 2024
1 parent 2b8a575 commit 7fea7c9
Show file tree
Hide file tree
Showing 46 changed files with 1,462 additions and 103 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ jobs:
env:
PGPASSWORD: postgres

- name: start redpanda
uses: redpanda-data/[email protected]
with:
version: "latest"

- name: Install Temporal CLI
uses: temporalio/setup-temporal@v0

Expand Down
1 change: 1 addition & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (a *FlowableActivity) SyncFlow(
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
11 changes: 7 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,7 @@ func (h *FlowRequestHandler) CreatePeer(
return wrongConfigResponse, nil
}
pgConfig := pgConfigObject.PostgresConfig

encodedConfig, encodingErr = proto.Marshal(pgConfig)

case protos.DBType_SNOWFLAKE:
sfConfigObject, ok := config.(*protos.Peer_SnowflakeConfig)
if !ok {
Expand Down Expand Up @@ -566,13 +564,18 @@ func (h *FlowRequestHandler) CreatePeer(
encodedConfig, encodingErr = proto.Marshal(s3Config)
case protos.DBType_CLICKHOUSE:
chConfigObject, ok := config.(*protos.Peer_ClickhouseConfig)

if !ok {
return wrongConfigResponse, nil
}

chConfig := chConfigObject.ClickhouseConfig
encodedConfig, encodingErr = proto.Marshal(chConfig)
case protos.DBType_KAFKA:
kaConfigObject, ok := config.(*protos.Peer_KafkaConfig)
if !ok {
return wrongConfigResponse, nil
}
kaConfig := kaConfigObject.KafkaConfig
encodedConfig, encodingErr = proto.Marshal(kaConfig)
default:
return wrongConfigResponse, nil
}
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
connkafka "github.com/PeerDB-io/peer-flow/connectors/kafka"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
conns3 "github.com/PeerDB-io/peer-flow/connectors/s3"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
Expand Down Expand Up @@ -202,6 +203,8 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig)
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, inner.ClickhouseConfig)
case *protos.Peer_KafkaConfig:
return connkafka.NewKafkaConnector(ctx, inner.KafkaConfig)
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down Expand Up @@ -260,6 +263,7 @@ var (
_ CDCSyncConnector = &connbigquery.BigQueryConnector{}
_ CDCSyncConnector = &connsnowflake.SnowflakeConnector{}
_ CDCSyncConnector = &conneventhub.EventHubConnector{}
_ CDCSyncConnector = &connkafka.KafkaConnector{}
_ CDCSyncConnector = &conns3.S3Connector{}
_ CDCSyncConnector = &connclickhouse.ClickhouseConnector{}

Expand Down
12 changes: 2 additions & 10 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ func (c *EventHubConnector) GetLastOffset(ctx context.Context, jobName string) (
}

func (c *EventHubConnector) SetLastOffset(ctx context.Context, jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(ctx, jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset", slog.Any("error", err))
return err
}

return nil
return c.pgMetadata.UpdateLastOffset(ctx, jobName, offset)
}

// returns the number of records synced
Expand Down Expand Up @@ -204,9 +198,7 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
batch := req.Records

numRecords, err := c.processBatch(ctx, req.FlowJobName, batch)
numRecords, err := c.processBatch(ctx, req.FlowJobName, req.Records)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
return nil, err
Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func (p *PostgresMetadataStore) Ping(ctx context.Context) error {
return nil
}

func (p *PostgresMetadataStore) LogFlowInfo(ctx context.Context, flowName string, info string) error {
_, err := p.pool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, info, "info")
return err
}

func (p *PostgresMetadataStore) FetchLastOffset(ctx context.Context, jobName string) (int64, error) {
row := p.pool.QueryRow(ctx,
`SELECT last_offset FROM `+
Expand Down
Loading

0 comments on commit 7fea7c9

Please sign in to comment.