Skip to content

Commit

Permalink
Encrypt credentials stored in peers table (#1882)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jun 27, 2024
1 parent 3e501e0 commit f386772
Show file tree
Hide file tree
Showing 20 changed files with 412 additions and 158 deletions.
17 changes: 11 additions & 6 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,21 +291,26 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
optionRows, err := a.CatalogPool.Query(ctx, `
SELECT DISTINCT p.name, p.options
FROM peers p
JOIN flows f ON p.id = f.source_peer
WHERE p.type = $1`, protos.DBType_POSTGRES)
SELECT p.name, p.options, p.enc_key_id
FROM peers p
WHERE p.type = $1 AND EXISTS(SELECT * FROM flows f ON p.id = f.source_peer)`, protos.DBType_POSTGRES)
if err != nil {
return nil, err
}

return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) {
var peerName string
var peerOptions []byte
err := optionRows.Scan(&peerName, &peerOptions)
var encPeerOptions []byte
var encKeyID string
if err := optionRows.Scan(&peerName, &encPeerOptions, &encKeyID); err != nil {
return nil, err
}

peerOptions, err := connectors.DecryptPeerOptions(encKeyID, encPeerOptions)
if err != nil {
return nil, err
}

var pgPeerConfig protos.PostgresConfig
unmarshalErr := proto.Unmarshal(peerOptions, &pgPeerConfig)
if unmarshalErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (h *FlowRequestHandler) CreatePeer(
}, nil
}

return utils.CreatePeerNoValidate(ctx, h.pool, req.Peer)
return utils.CreatePeerNoValidate(ctx, h.pool, req.Peer, req.AllowUpdate)
}

func (h *FlowRequestHandler) DropPeer(
Expand Down
13 changes: 9 additions & 4 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ import (
)

func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName string) (*protos.PostgresConfig, error) {
var pgPeerOptions sql.RawBytes
var pgPeerConfig protos.PostgresConfig
var encPeerOptions []byte
var encKeyID string
err := h.pool.QueryRow(ctx,
"SELECT options FROM peers WHERE name = $1 AND type=3", peerName).Scan(&pgPeerOptions)
"SELECT options, enc_key_id FROM peers WHERE name = $1 AND type=3", peerName).Scan(&encPeerOptions, &encKeyID)
if err != nil {
return nil, err
}

err = proto.Unmarshal(pgPeerOptions, &pgPeerConfig)
peerOptions, err := connectors.DecryptPeerOptions(encKeyID, encPeerOptions)
if err != nil {
return nil, fmt.Errorf("failed to load peer: %w", err)
}

var pgPeerConfig protos.PostgresConfig
if err := proto.Unmarshal(peerOptions, &pgPeerConfig); err != nil {
return nil, err
}

Expand Down
27 changes: 24 additions & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_guages"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type Connector interface {
Expand Down Expand Up @@ -264,13 +265,19 @@ func LoadPeerType(ctx context.Context, catalogPool *pgxpool.Pool, peerName strin

func LoadPeer(ctx context.Context, catalogPool *pgxpool.Pool, peerName string) (*protos.Peer, error) {
row := catalogPool.QueryRow(ctx, `
SELECT type, options
SELECT type, options, enc_key_id
FROM peers
WHERE name = $1`, peerName)

peer := &protos.Peer{Name: peerName}
var peerOptions []byte
if err := row.Scan(&peer.Type, &peerOptions); err != nil {
var encPeerOptions []byte
var encKeyID string
if err := row.Scan(&peer.Type, &encPeerOptions, &encKeyID); err != nil {
return nil, fmt.Errorf("failed to load peer: %w", err)
}

peerOptions, err := DecryptPeerOptions(encKeyID, encPeerOptions)
if err != nil {
return nil, fmt.Errorf("failed to load peer: %w", err)
}

Expand Down Expand Up @@ -354,6 +361,20 @@ func LoadPeer(ctx context.Context, catalogPool *pgxpool.Pool, peerName string) (
return peer, nil
}

func DecryptPeerOptions(encKeyID string, encPeerOptions []byte) ([]byte, error) {
if encKeyID == "" {
return encPeerOptions, nil
}

keys := peerdbenv.PeerDBEncKeys()
key, err := keys.Get(encKeyID)
if err != nil {
return nil, fmt.Errorf("failed to load peer, unable to find encryption key - %s", encKeyID)
}

return key.Decrypt(encPeerOptions)
}

func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
switch inner := config.Config.(type) {
case *protos.Peer_PostgresConfig:
Expand Down
42 changes: 36 additions & 6 deletions flow/connectors/utils/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func CreatePeerNoValidate(
ctx context.Context,
pool *pgxpool.Pool,
peer *protos.Peer,
allowUpdate bool,
) (*protos.CreatePeerResponse, error) {
config := peer.Config
peerType := peer.Type
Expand Down Expand Up @@ -88,19 +90,29 @@ func CreatePeerNoValidate(
default:
return wrongConfigResponse, nil
}

encodedConfig, encodingErr := proto.Marshal(innerConfig)
if encodingErr != nil {
slog.Error(fmt.Sprintf("failed to encode peer configuration for %s peer %s : %v",
peer.Type, peer.Name, encodingErr))
return nil, encodingErr
}

_, err := pool.Exec(ctx, `
INSERT INTO peers (name, type, options)
VALUES ($1, $2, $3)
ON CONFLICT (name) DO UPDATE
SET type = $2, options = $3`,
peer.Name, peerType, encodedConfig,
encryptedConfig, keyID, err := encryptPeerOptions(encodedConfig)
if err != nil {
return nil, fmt.Errorf("failed to encrypt peer configuration: %w", err)
}

onConflict := "NOTHING"
if allowUpdate {
onConflict = "UPDATE SET type = $2,options = $3,enc_key_id = $4"
}

_, err = pool.Exec(ctx, `
INSERT INTO peers (name, type, options, enc_key_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (name) DO `+onConflict,
peer.Name, peerType, encryptedConfig, keyID,
)
if err != nil {
return &protos.CreatePeerResponse{
Expand All @@ -115,3 +127,21 @@ func CreatePeerNoValidate(
Message: "",
}, nil
}

func encryptPeerOptions(peerOptions []byte) ([]byte, string, error) {
key, err := peerdbenv.PeerDBCurrentEncKey()
if err != nil {
return nil, "", fmt.Errorf("failed to get current encryption key: %w", err)
}

if key == nil {
return peerOptions, "", nil
}

encrypted, err := key.Encrypt(peerOptions)
if err != nil {
return nil, "", fmt.Errorf("failed to encrypt peer options: %w", err)
}

return encrypted, key.ID, nil
}
2 changes: 1 addition & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func CreatePeer(t *testing.T, peer *protos.Peer) {
ctx := context.Background()
pool, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx)
require.NoError(t, err)
res, err := utils.CreatePeerNoValidate(ctx, pool, peer)
res, err := utils.CreatePeerNoValidate(ctx, pool, peer, false)
require.NoError(t, err)
if res.Status != protos.CreatePeerStatus_CREATED {
require.Fail(t, res.Message)
Expand Down
14 changes: 14 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,17 @@ func PeerDBAlertingEmailSenderRegion() string {
func PeerDBAlertingEmailSenderReplyToAddresses() string {
return GetEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "")
}

func PeerDBCurrentEncKeyID() string {
return GetEnvString("PEERDB_CURRENT_ENC_KEY_ID", "")
}

func PeerDBEncKeys() shared.PeerDBEncKeys {
return GetEnvJSON[shared.PeerDBEncKeys]("PEERDB_ENC_KEYS", nil)
}

func PeerDBCurrentEncKey() (*shared.PeerDBEncKey, error) {
encKeyID := PeerDBCurrentEncKeyID()
encKeys := PeerDBEncKeys()
return encKeys.Get(encKeyID)
}
15 changes: 15 additions & 0 deletions flow/peerdbenv/env.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package peerdbenv

import (
"encoding/json"
"os"
"reflect"
"strconv"
Expand Down Expand Up @@ -51,3 +52,17 @@ func GetEnvString(name string, defaultValue string) string {

return val
}

func GetEnvJSON[T any](name string, defaultValue T) T {
val, ok := os.LookupEnv(name)
if !ok {
return defaultValue
}

var result T
if err := json.Unmarshal([]byte(val), &result); err != nil {
return defaultValue
}

return result
}
99 changes: 99 additions & 0 deletions flow/shared/enc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package shared

import (
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"io"

"golang.org/x/crypto/chacha20poly1305"
)

// PeerDBEncKey is a key for encrypting and decrypting data.
type PeerDBEncKey struct {
ID string `json:"id"`
Value string `json:"value"`
}

type PeerDBEncKeys []PeerDBEncKey

func (e PeerDBEncKeys) Get(id string) (*PeerDBEncKey, error) {
if id == "" {
return nil, nil
}

for _, key := range e {
if key.ID == id {
return &key, nil
}
}

return nil, fmt.Errorf("failed to find encryption key - %s", id)
}

const nonceSize = chacha20poly1305.NonceSizeX

// Decrypt decrypts the given ciphertext using the PeerDBEncKey.
func (key *PeerDBEncKey) Decrypt(ciphertext []byte) ([]byte, error) {
if key == nil {
return nil, errors.New("encryption key is nil")
}

decodedKey, err := base64.StdEncoding.DecodeString(key.Value)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 key: %w", err)
}

if len(decodedKey) != 32 {
return nil, errors.New("invalid key length, must be 32 bytes")
}

if len(ciphertext) < nonceSize {
return nil, errors.New("ciphertext too short")
}

nonce := ciphertext[:nonceSize]
ciphertext = ciphertext[nonceSize:]

aead, err := chacha20poly1305.NewX(decodedKey)
if err != nil {
return nil, fmt.Errorf("failed to create ChaCha20-Poly1305: %w", err)
}

plaintext, err := aead.Open(nil, nonce, ciphertext, nil)
if err != nil {
return nil, fmt.Errorf("failed to decrypt: %w", err)
}

return plaintext, nil
}

// Encrypt encrypts the given plaintext using the PeerDBEncKey.
func (key *PeerDBEncKey) Encrypt(plaintext []byte) ([]byte, error) {
if key == nil {
return nil, errors.New("encryption key is nil")
}

decodedKey, err := base64.StdEncoding.DecodeString(key.Value)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 key: %w", err)
}

if len(decodedKey) != 32 {
return nil, errors.New("invalid key length, must be 32 bytes")
}

aead, err := chacha20poly1305.NewX(decodedKey)
if err != nil {
return nil, fmt.Errorf("failed to create ChaCha20-Poly1305: %w", err)
}

nonce := make([]byte, nonceSize)
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, fmt.Errorf("failed to generate nonce: %w", err)
}

ciphertext := aead.Seal(nonce, nonce, plaintext, nil)
return ciphertext, nil
}
Loading

0 comments on commit f386772

Please sign in to comment.