Skip to content

Commit

Permalink
Clickhouse peer: specify S3 fields (#1205)
Browse files Browse the repository at this point in the history
This PR refactors the Clickhouse peer to now have the S3 fields - object
path, access key id, secret key and region:

```
message ClickhouseConfig{
  string host = 1;
  uint32 port = 2;
  string user = 3;
  string password = 4;
  string database = 5;
  string s3_path = 6; // path to S3 bucket which will store avro files
  string access_key_id = 7;
  string secret_access_key = 8;
  string region = 9;
}
```

The UI for Clickhouse peer creation has been updated accordingly:
<img width="956" alt="Screenshot 2024-02-05 at 5 36 55 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/b47a8fcc-3575-4f09-8a0d-74eb9b9a986f">

Validation for the S3 stage is also added as part of validate peer for
clickhouse.

Nexus code updated accordingly.

This PR has been functionally tested by validating and creating a
clickhouse peer followed by performing a basic CDC PG -> CH mirror
  • Loading branch information
Amogh-Bharadwaj authored Feb 5, 2024
1 parent 169112a commit a2c1f69
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 222 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}

qrepConfig := &protos.QRepConfig{
StagingPath: c.config.S3Integration,
StagingPath: c.config.S3Path,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
Expand Down
36 changes: 33 additions & 3 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
conns3 "github.com/PeerDB-io/peer-flow/connectors/s3"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand All @@ -21,12 +23,28 @@ type ClickhouseConnector struct {
tableSchemaMapping map[string]*protos.TableSchema
logger slog.Logger
config *protos.ClickhouseConfig
creds utils.S3PeerCredentials
}

func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error {
// for validation purposes
s3Client, err := utils.CreateS3Client(creds)
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
}

validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil)
if validErr != nil {
return validErr
}

return nil
}

func NewClickhouseConnector(ctx context.Context,
clickhouseProtoConfig *protos.ClickhouseConfig,
config *protos.ClickhouseConfig,
) (*ClickhouseConnector, error) {
database, err := connect(ctx, clickhouseProtoConfig)
database, err := connect(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}
Expand All @@ -37,14 +55,26 @@ func NewClickhouseConnector(ctx context.Context,
return nil, err
}

s3PeerCreds := utils.S3PeerCredentials{
AccessKeyID: config.AccessKeyId,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
}

validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds)
if validateErr != nil {
return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &ClickhouseConnector{
ctx: ctx,
database: database,
pgMetadata: pgMetadata,
tableSchemaMapping: nil,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
config: clickhouseProtoConfig,
config: config,
creds: s3PeerCreds,
}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func NewClickhouseAvroSyncMethod(
func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error {
stagingPath := s.config.StagingPath
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration // "s3://avro-clickhouse"
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return err
}
awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{})
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
startTime := time.Now()
dstTableName := config.DestinationTableIdentifier

stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Integration
stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path

schema, err := stream.Schema()
if err != nil {
Expand All @@ -120,7 +120,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, err
}
awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{})
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

if err != nil {
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
) (*avro.AvroFile, error) {
stagingPath := s.config.StagingPath // "s3://avro-clickhouse"
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration // "s3://avro-clickhouse"
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd,
qvalue.QDWHTypeClickhouse)
Expand All @@ -178,7 +178,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName
s3AvroFileKey = strings.Trim(s3AvroFileKey, "/")

avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{})
avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, s.connector.creds)
if err != nil {
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}
Expand Down
13 changes: 5 additions & 8 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,17 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta
}

// check if we can ping external metadata
err := metadataDB.Ping()
if err != nil {
return fmt.Errorf("failed to ping external metadata: %w", err)
if metadataDB != nil {
err := metadataDB.Ping()
if err != nil {
return fmt.Errorf("failed to ping external metadata: %w", err)
}
}

return nil
}

func (c *S3Connector) ConnectionActive() error {
_, listErr := c.client.ListBuckets(c.ctx, nil)
if listErr != nil {
return listErr
}

validErr := ValidCheck(c.ctx, &c.client, c.url, c.pgMetadata)
if validErr != nil {
c.logger.Error("failed to validate s3 connector:", slog.Any("error", validErr))
Expand Down
22 changes: 16 additions & 6 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,11 +779,6 @@ fn parse_db_options(
Some(config)
}
DbType::Clickhouse => {
let s3_int = opts
.get("s3_integration")
.map(|s| s.to_string())
.unwrap_or_default();

let clickhouse_config = ClickhouseConfig {
host: opts.get("host").context("no host specified")?.to_string(),
port: opts
Expand All @@ -803,7 +798,22 @@ fn parse_db_options(
.get("database")
.context("no default database specified")?
.to_string(),
s3_integration: s3_int,
s3_path: opts
.get("s3_path")
.context("no s3 path specified")?
.to_string(),
access_key_id: opts
.get("access_key_id")
.context("no access key id specified")?
.to_string(),
secret_access_key: opts
.get("secret_access_key")
.context("no secret access key specified")?
.to_string(),
region: opts
.get("region")
.context("no region specified")?
.to_string(),
};
let config = Config::ClickhouseConfig(clickhouse_config);
Some(config)
Expand Down
31 changes: 22 additions & 9 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashMap;
use anyhow::{anyhow, Context};
use peer_cursor::{QueryExecutor, QueryOutput, Schema};
use peer_postgres::{self, ast};
use pgwire::error::{PgWireResult};
use pgwire::error::PgWireResult;
use postgres_connection::{connect_postgres, get_pg_connection_string};
use prost::Message;
use pt::{
Expand Down Expand Up @@ -165,7 +165,10 @@ impl Catalog {
pub async fn get_peer_id_i32(&self, peer_name: &str) -> anyhow::Result<i32> {
let stmt = self
.pg
.prepare_typed("SELECT id FROM public.peers WHERE name = $1", &[types::Type::TEXT])
.prepare_typed(
"SELECT id FROM public.peers WHERE name = $1",
&[types::Type::TEXT],
)
.await?;

self.pg
Expand All @@ -179,7 +182,10 @@ impl Catalog {
pub async fn get_peer_type_for_id(&self, peer_id: i32) -> anyhow::Result<DbType> {
let stmt = self
.pg
.prepare_typed("SELECT type FROM public.peers WHERE id = $1", &[types::Type::INT4])
.prepare_typed(
"SELECT type FROM public.peers WHERE id = $1",
&[types::Type::INT4],
)
.await?;

self.pg
Expand Down Expand Up @@ -251,7 +257,10 @@ impl Catalog {
pub async fn get_peer_by_id(&self, peer_id: i32) -> anyhow::Result<Peer> {
let stmt = self
.pg
.prepare_typed("SELECT name, type, options FROM public.peers WHERE id = $1", &[])
.prepare_typed(
"SELECT name, type, options FROM public.peers WHERE id = $1",
&[],
)
.await?;

let rows = self.pg.query(&stmt, &[&peer_id]).await?;
Expand Down Expand Up @@ -557,7 +566,10 @@ impl Catalog {
pub async fn delete_flow_job_entry(&self, flow_job_name: &str) -> anyhow::Result<()> {
let rows = self
.pg
.execute("DELETE FROM public.flows WHERE name = $1", &[&flow_job_name])
.execute(
"DELETE FROM public.flows WHERE name = $1",
&[&flow_job_name],
)
.await?;
if rows == 0 {
return Err(anyhow!("unable to delete flow job metadata"));
Expand All @@ -568,7 +580,10 @@ impl Catalog {
pub async fn check_peer_entry(&self, peer_name: &str) -> anyhow::Result<i64> {
let peer_check = self
.pg
.query_one("SELECT COUNT(*) FROM public.peers WHERE name = $1", &[&peer_name])
.query_one(
"SELECT COUNT(*) FROM public.peers WHERE name = $1",
&[&peer_name],
)
.await?;
let peer_count: i64 = peer_check.get(0);
Ok(peer_count)
Expand Down Expand Up @@ -599,9 +614,7 @@ impl Catalog {
impl QueryExecutor for Catalog {
#[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))]
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
peer_postgres::pg_execute(&self.pg, ast::PostgresAst {
peername: None,
}, stmt).await
peer_postgres::pg_execute(&self.pg, ast::PostgresAst { peername: None }, stmt).await
}

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
Expand Down
Loading

0 comments on commit a2c1f69

Please sign in to comment.