pgstream
is an open source CDC command-line tool and library that offers Postgres replication support with DDL changes to any provided output.
- Schema change tracking and replication of DDL changes
- Modular deployment configuration, only requires Postgres
- Schema based message partitioning
- Schema filtering
- Elasticsearch/OpenSearch replication output plugin support
- Webhook support
- Automatic discovery of table primary key/unique not null columns for use as event identity
- Highly customisable modules when used as library
- Core metrics available via opentelemetry
- Extendable support for custom replication output plugins
- Continuous consumption of replication slot with configurable memory guards
pgstream
can be used via the readily available CLI or as a library.
Binaries are available for Linux, macOS & Windows, check our Releases.
To install pgstream
from the source, run the following command:
go install github.com/ApollosProject/pgstream-wal2json@latest
To install pgstream
with homebrew, run the following command:
# macOS or Linux
brew tap xataio/pgstream
brew install pgstream
If you have an environment available, with at least Postgres and whichever module resources you're planning on running, then you can skip this step. Otherwise, a docker setup is available in this repository that starts Postgres, Kafka and OpenSearch (as well as OpenSearch dashboards for easy visualisation).
docker-compose -f build/docker/docker-compose.yml up
This will create the pgstream
schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See Tracking schema changes section for more details. It will also create a replication slot for the configured database which will be used by the pgstream service.
pgstream init --pgurl "postgres://postgres:postgres@localhost?sslmode=disable"
If there are any issues or if you want to clean up the pgstream setup, you can run the following.
pgstream tear-down --pgurl "postgres://postgres:postgres@localhost?sslmode=disable"
This command will clean up all pgstream state.
Run will require the configuration to be provided, either via environment variables, config file or a combination of both. There are some sample configuration files provided in the repo that can be used as guidelines.
Example running pgstream with Postgres -> OpenSearch:
pgstream run -c pg2os.env --log-level trace
Example running pgstream with Postgres -> Kafka, and in a separate terminal, Kafka->OpenSearch:
pgstream run -c pg2kafka.env --log-level trace
pgstream run -c kafka2os.env --log-level trace
The run command will parse the configuration provided, and initialise the configured modules. It requires at least one listener and one processor.
Here's a list of all the environment variables that can be used to configure the individual modules, along with their descriptions and default values.
Postgres Listener
Environment Variable | Default | Required | Description |
---|---|---|---|
PGSTREAM_POSTGRES_LISTENER_URL | N/A | Yes | URL of the Postgres database to connect to for replication purposes. |
Kafka Listener
Environment Variable | Default | Required | Description |
---|---|---|---|
PGSTREAM_KAFKA_SERVERS | N/A | Yes | URLs for the Kafka servers to connect to. |
PGSTREAM_KAFKA_TOPIC_NAME | N/A | Yes | Name of the Kafka topic to read from. |
PGSTREAM_KAFKA_READER_CONSUMER_GROUP_ID | N/A | Yes | Name of the Kafka consumer group for the WAL Kafka reader. |
PGSTREAM_KAFKA_READER_CONSUMER_GROUP_START_OFFSET | Earliest | No | Kafka offset from which the consumer will start if there's no offset available for the consumer group. |
PGSTREAM_KAFKA_TLS_ENABLED | False | No | Enable TLS connection to the Kafka servers. |
PGSTREAM_KAFKA_TLS_CA_CERT_FILE | "" | When TLS enabled | Path to the CA PEM certificate to use for Kafka TLS authentication. |
PGSTREAM_KAFKA_TLS_CLIENT_CERT_FILE | "" | No | Path to the client PEM certificate to use for Kafka TLS client authentication. |
PGSTREAM_KAFKA_TLS_CLIENT_KEY_FILE | "" | No | Path to the client PEM private key to use for Kafka TLS client authentication. |
PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_INITIAL_INTERVAL | 0 | No | Initial interval for the exponential backoff policy to be applied to the Kafka commit retries. |
PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_MAX_INTERVAL | 0 | No | Max interval for the exponential backoff policy to be applied to the Kafka commit retries. |
PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the Kafka commit retries. |
PGSTREAM_KAFKA_COMMIT_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the Kafka commit retries. |
PGSTREAM_KAFKA_COMMIT_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the Kafka commit retries. |
One of exponential/constant backoff policies can be provided for the Kafka committing retry strategy. If none is provided, no retries apply.
Kafka Batch Writer
Environment Variable | Default | Required | Description |
---|---|---|---|
PGSTREAM_KAFKA_SERVERS | N/A | Yes | URLs for the Kafka servers to connect to. |
PGSTREAM_KAFKA_TOPIC_NAME | N/A | Yes | Name of the Kafka topic to write to. |
PGSTREAM_KAFKA_TOPIC_PARTITIONS | 1 | No | Number of partitions created for the Kafka topic if auto create is enabled. |
PGSTREAM_KAFKA_TOPIC_REPLICATION_FACTOR | 1 | No | Replication factor used when creating the Kafka topic if auto create is enabled. |
PGSTREAM_KAFKA_TOPIC_AUTO_CREATE | False | No | Auto creation of configured Kafka topic if it doesn't exist. |
PGSTREAM_KAFKA_TLS_ENABLED | False | No | Enable TLS connection to the Kafka servers. |
PGSTREAM_KAFKA_TLS_CA_CERT_FILE | "" | When TLS enabled | Path to the CA PEM certificate to use for Kafka TLS authentication. |
PGSTREAM_KAFKA_TLS_CLIENT_CERT_FILE | "" | No | Path to the client PEM certificate to use for Kafka TLS client authentication. |
PGSTREAM_KAFKA_TLS_CLIENT_KEY_FILE | "" | No | Path to the client PEM private key to use for Kafka TLS client authentication. |
PGSTREAM_KAFKA_WRITER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to Kafka is triggered. |
PGSTREAM_KAFKA_WRITER_BATCH_BYTES | 1572864 | No | Max size in bytes for a given batch. When this size is reached, the batch is sent to Kafka. |
PGSTREAM_KAFKA_WRITER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to Kafka. |
PGSTREAM_KAFKA_WRITER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the Kafka batch writer for inflight batches. |
Search Batch Indexer
Environment Variable | Default | Required | Description |
---|---|---|---|
PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). |
PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). |
PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. |
PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. |
PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. |
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_INITIAL_INTERVAL | 0 | No | Initial interval for the exponential backoff policy to be applied to the search indexer cleanup retries. |
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_INTERVAL | 0 | No | Max interval for the exponential backoff policy to be applied to the search indexer cleanup retries. |
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search indexer cleanup retries. |
PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search indexer cleanup retries. |
PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search indexer cleanup retries. |
PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. |
PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. |
PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. |
PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. |
PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. |
One of exponential/constant backoff policies can be provided for the search indexer cleanup retry strategy. If none is provided, no retries apply.
One of exponential/constant backoff policies can be provided for the search store retry strategy. If none is provided, a default exponential backoff policy applies.
Webhook Notifier
Environment Variable | Default | Required | Description |
---|---|---|---|
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL | N/A | Yes | URL for the webhook subscription store to connect to. |
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_ENABLED | False | No | Caching applied to the subscription store retrieval queries. |
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_REFRESH_INTERVAL | 60s | When cache enabled | Interval at which the subscription store cache will be refreshed. Indicates max cache staleness. |
PGSTREAM_WEBHOOK_NOTIFIER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the webhook notifier for inflight notifications. |
PGSTREAM_WEBHOOK_NOTIFIER_WORKER_COUNT | 10 | No | Max number of concurrent workers that will send webhook notifications for a given WAL event. |
PGSTREAM_WEBHOOK_NOTIFIER_CLIENT_TIMEOUT | 10s | No | Max time the notifier will wait for a response from a webhook URL before timing out. |
PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_ADDRESS | ":9900" | No | Address for the subscription server to listen on. |
PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_READ_TIMEOUT | 5s | No | Max duration for reading an entire server request, including the body before timing out. |
PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_WRITE_TIMEOUT | 10s | No | Max duration before timing out writes of the response. It is reset whenever a new request's header is read. |
Translator
Environment Variable | Default | Required | Description |
---|---|---|---|
PGSTREAM_TRANSLATOR_STORE_POSTGRES_URL | N/A | Yes | URL for the postgres URL where the schema log table is stored. |
One of the main differentiators of pgstream is the fact that it tracks and replicates schema changes automatically. It relies on SQL triggers that will populate a Postgres table (pgstream.schema_log
) containing a history log of all DDL changes for a given schema. Whenever a schema change occurs, this trigger creates a new row in the schema log table with the schema encoded as a JSON value. This table tracks all the schema changes, forming a linearised change log that is then parsed and used within the pgstream pipeline to identify modifications and push the relevant changes downstream.
The detailed SQL used can be found in the migrations folder.
The schema and data changes are part of the same linear stream - the downstream consumers always observe the schema changes as soon as they happen, before any data arrives that relies on the new schema. This prevents data loss and manual intervention.
pgstream
is constructed as a streaming pipeline, where data from one module streams into the next, eventually reaching the configured output plugins. It keeps track of schema changes and replicates them along with the data changes to ensure a consistent view of the source data downstream. This modular approach makes adding and integrating output plugin implementations simple and painless.
At a high level the implementation is split into WAL listeners and WAL processors.
A listener is anything that listens for WAL data, regardless of the source. It has a single responsibility: consume and manage the WAL events, delegating the processing of those entries to modules that form the processing pipeline. Depending on the listener implementation, it might be required to also have a checkpointer to flag the events as processed once the processor is done.
There are currently two implementations of the listener:
-
Postgres listener: listens to WAL events directly from the replication slot. Since the WAL replication slot is sequential, the Postgres WAL listener is limited to run as a single process. The associated Postgres checkpointer will sync the LSN so that the replication lag doesn't grow indefinitely.
-
Kafka reader: reads WAL events from a Kafka topic. It can be configured to run concurrently by using partitions and Kafka consumer groups, applying a fan-out strategy to the WAL events. The data will be partitioned by database schema by default, but can be configured when using
pgstream
as a library. The associated Kafka checkpointer will commit the message offsets per topic/partition so that the consumer group doesn't process the same message twice.
A processor processes a WAL event. Depending on the implementation it might also be required to checkpoint the event once it's done processing it as described above.
There are currently two implementations of the processor:
-
Kafka batch writer: it writes the WAL events into a Kafka topic, using the event schema as the Kafka key for partitioning. This implementation allows to fan-out the sequential WAL events, while acting as an intermediate buffer to avoid the replication slot to grow when there are slow consumers. It has a memory guarded buffering system internally to limit the memory usage of the buffer. The buffer is sent to Kafka based on the configured linger time and maximum size. It treats both data and schema events equally, since it doesn't care about the content.
-
Search batch indexer: it indexes the WAL events into an OpenSearch/Elasticsearch compatible search store. It implements the same kind of mechanism than the Kafka batch writer to ensure continuous processing from the listener, and it also uses a batching mechanism to minimise search store calls. The search mapping logic is configurable when used as a library. The WAL event identity is used as the search store document id, and if no other version is provided, the LSN is used as the document version. Events that do not have an identity are not indexed. Schema events are stored in a separate search store index (
pgstream
), where the schema log history is kept for use within the search store (i.e, read queries). -
Webhook notifier: it sends a notification to any webhooks that have subscribed to the relevant wal event. It relies on a subscription HTTP server receiving the subscription requests and storing them in the shared subscription store which is accessed whenever a wal event is processed. It sends the notifications to the different subscribed webhook urls in parallel based on a configurable number of workers (client timeouts apply). Similar to the two previous processor implementations, it uses a memory guarded buffering system internally, which allows to separate the wal event processing from the webhook url sending, optimising the processor latency.
In addition to the implementations described above, there's an optional processor decorator, the translator, that injects some of the pgstream logic into the WAL event. This includes:
-
Data events:
- Setting the WAL event identity. If provided, it will use the configured id finder (only available when used as a library), otherwise it will default to using the table primary key/unique not null column.
- Setting the WAL event version. If provided, it will use the configured version finder (only available when used as a library), otherwise it will default to using the event LSN.
- Adding pgstream IDs to all columns. This allows us to have a constant identifier for a column, so that if there are renames the column id doesn't change. This is particularly helpful for the search store, where a rename would require a reindex, which can be costly depending on the data.
-
Schema events:
- Acknolwedging the new incoming schema in the Postgres
pgstream.schema_log
table.
- Acknolwedging the new incoming schema in the Postgres
Some of the limitations of the initial release include:
- Single Kafka topic support
- Postgres plugin support limited to
wal2json
- Data filtering limited to schema level
- No initial/automatic data backfill
- Primary key/unique not null column required for replication
- Kafka serialisation support limited to JSON
- CDC: Change Data Capture
- WAL: Write Ahead Logging
- LSN: Log Sequence Number
- DDL: Data Definition Language
We welcome contributions from the community! If you'd like to contribute to pgstream, please follow these guidelines:
- Create an issue for any questions, bug reports, or feature requests.
- Check the documentation and existing issues before opening a new issue.
- Fork the repository.
- Create a new branch for your feature or bug fix.
- Make your changes and write tests if applicable.
- Ensure your code passes linting and tests.
- There's a pre-commit configuration available on the root directory (
.pre-commit-config.yaml
), which can be used to validate the CI checks locally.
- There's a pre-commit configuration available on the root directory (
- Submit a pull request.
For this project, we pledge to act and interact in ways that contribute to an open, welcoming, diverse, inclusive, and healthy community.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
If you have any questions, encounter issues, or need assistance, open an issue in this repository our join our Discord, and our community will be happy to help.
Made with ❤️ by Xata 🦋