⬇️ ⬇️ ⬇️ ⬇️
*** This project has migrated to r2dbc-proxy ***
Samples with r2dbc-proxy are available here: r2dbc-proxy-examples
⬆️ ⬆️ ⬆️ ⬆️
datasource-proxy-r2dbc supports r2dbc-spi 1.0.0.M6, for later versions of r2dbc-spi, please use r2dbc-proxy
datasource-proxy for R2DBC.
Provide listeners that receive callbacks of query executions and method invocations on R2DBC SPI.
Callbacks are:
- Before/After query executions when
is called. - Before/After any method calls on
. - Each mapped query result emitted by
Publisher<? extends Result>
Here is sample use cases for listeners:
- Query logging
- Slow query detection
- Tracing
- Metrics
- Assertion/Verification
- Connection leak detection
- Transaction check
- Custom logic injection
- etc.
When query is executed by Batch#execute()
or Statement#execute()
, listener receives query
The callback contains query execution information(QueryExecutionInfo
) such as query string,
execution type, bindings, execution time, etc.
You could output/log the information.
Sample Output (wrapped for display purpose):
# Statement with no bindings
Thread:reactor-tcp-nio-1(30) Connection:1
Transaction:{Create:1 Rollback:0 Commit:0}
Success:True Time:34
Type:Statement BatchSize:0 BindingsSize:0
Query:["SELECT value FROM test"], Bindings:[]
# Batch query
Thread:reactor-tcp-nio-3(32) Connection:2
Transaction:{Create:1 Rollback:0 Commit:0}
Success:True Time:4
Type:Batch BatchSize:2 BindingsSize:0
Query:["INSERT INTO test VALUES(200)","SELECT value FROM test"], Bindings:[]
# Statement with multiple bindings
Thread:reactor-tcp-nio-1(30) Connection:3
Transaction:{Create:1 Rollback:0 Commit:0}
Success:True Time:21
Type:Statement BatchSize:0 BindingsSize:2
Query:["INSERT INTO test VALUES ($1,$2)"], Bindings:[(100,101),(200,null(int))]
There are two types of slow query detection.
- Detect slow query AFTER query has executed.
- Detect slow query WHILE query is running.
Former is simple. On afterQuery
callback, check the execution time.
If it took more than threshold, perform an action such as logging, send notification, etc.
To perform some action while query is still executing and it has passed the threshold time, one implementation
is to create a watcher that checks running queries and notify ones exceeded the threshold.
It is currently in plan to port SlowQueryListener
from datasource-proxy.
When any methods on proxy classes(ConnectionFactory
, Connection
, Batch
, Statement
, or Result
are called, listeners receive callbacks on before and after invocations.
Below output simply printed out the method execution information(MethodExecutionInfo
at each method invocation.
Essentially, this shows interaction with R2DBC SPI.
Sample: Execution with transaction (see sample):
1: Thread:34 Connection:1 Time:16 PostgresqlConnectionFactory#create()
2: Thread:34 Connection:1 Time:0 PostgresqlConnection#createStatement()
3: Thread:34 Connection:1 Time:0 ExtendedQueryPostgresqlStatement#bind()
4: Thread:34 Connection:1 Time:0 ExtendedQueryPostgresqlStatement#add()
5: Thread:34 Connection:1 Time:5 PostgresqlConnection#beginTransaction()
6: Thread:34 Connection:1 Time:5 ExtendedQueryPostgresqlStatement#execute()
7: Thread:34 Connection:1 Time:3 PostgresqlConnection#commitTransaction()
8: Thread:34 Connection:1 Time:4 PostgresqlConnection#close()
Sample project: Tracing with sleuth
- TracingExecutionListener:
Tracing listener implementation using
On every callback, any obtained information can update metrics.
For example:
- Number of opened connections
- Number of rollbacks
- Method execution time
- Number of queries
- Type of query (SELECT, DELETE, ...)
- Query execution time
- etc.
Sample project: Metrics with micrometer (and log slow queries)
This sample project populates following metrics:
- Time took to create a connection
- Commit and rollback counts
- Executed query count
- Slow query count
The key implementation is MetricsExecutionListener which populates micrometer metrics and logs slow queries.
Transaction metrics on actuator (/actuator/metrics/r2dbc.transaction
By inspecting invoked methods and/or executed queries, you can verify your logic has performed as expected.
For example, by keeping track of connection open/close method calls, connection leaks can be detected or verified.
Another example is to check group of the target queries are executed on the same connection. This could verify the premise of transaction that queries need to be performed on the same connection in order to be in the same transaction.
Any logic can be performed on callbacks. Thus, you can write own logic that performs anything, such as audit logging, sending notifications, calling external system, etc.
Sample projects: datasource-proxy-r2dbc-samples
- Tracing with Sleuth
- TracingExecutionListener implementation
is the foundation listener interface.
This defines callbacks for method invocation, query execution, and query result processing.
// invoked before any method on proxy is called
void beforeMethod(MethodExecutionInfo executionInfo);
// invoked after any method on proxy is called
void afterMethod(MethodExecutionInfo executionInfo);
// invoked before query gets executed
void beforeQuery(QueryExecutionInfo execInfo);
// invoked after query is executed
void afterQuery(QueryExecutionInfo execInfo);
// invoked on processing(subscribing) each query result
void eachQueryResult(QueryExecutionInfo execInfo);
and QueryExecutionInfo
contains contextual information about the
method/query execution.
Any method calls on proxied ConnectionFactory
, Connection
, Batch
, Statement
, and Result
triggers method callbacks - beforeMethod()
and afterMethod()
and Statement#execute()
triggers query callbacks - beforeQuery()
and afterQuery()
.(Specifically when returned result publisher is subscribed.)
is called on each mapped query result when Result#map()
is subscribed.
provides before/after methods for all methods defined on ConnectionFactory
, Batch
, Statement
, and Result
, as well as method executions(beforeMethod
, afterMethod
query executions(beforeQuery
, afterQuery
) and result processing(onEachQueryResult
This listener is built on top of method and query interceptor API on ProxyExecutionListener
For example, if you want know the creation of connection and close of it:
public class ConnectionStartToEndListener implements LifeCycleListener {
public void beforeCreateOnConnectionFactory(MethodExecutionInfo methodExecutionInfo) {
// callback at ConnectionFactory#create()
public void afterCloseOnConnection(MethodExecutionInfo methodExecutionInfo) {
// callback at Connection#close()
This class converts QueryExecutionInfo
to String
. Mainly used for preparing log entries.
Internally, this class has multiple consumers for QueryExecutionInfo
and loop through them to
populate the output StringBuilder
This class implements Function<QueryExecutionInfo,String>
and can be used in functional style as well.
// convert all info
QueryExecutionInfoFormatter formatter = QueryExecutionInfoFormatter#showAll();
String str = formatter.format(queryExecutionInfo);
// customize conversion
QueryExecutionInfoFormatter formatter = new QueryExecutionInfoFormatter();
formatter.addConsumer((execInfo, sb) -> {
sb.append("MY-QUERY-EXECUTION="); // add prefix
formatter.newLine(); // new line
formatter.showConnection((execInfo, sb) -> {
// custom conversion
sb.append("MY-ID=" + executionInfo.getConnectionInfo().getConnectionId());
// convert it
String str = formatter.format(queryExecutionInfo);
Similar to QueryExecutionInfoFormatter
, MethodExecutionInfoFormatter
converts MethodExecutionInfo
MethodExecutionInfoFormatter formatter = MethodExecutionInfoFormatter.withDefault();
.onAfterMethod(execInfo ->
execInfo.map(methodExecutionFormatter::format) // convert
.doOnNext(System.out::println) // print out to sysout
Use ProxyConnectionFactoryBuilder
to create a proxied ConnectionFactory
and pass it to R2DBC client.
// original connection factory
ConnectionFactory connectionFactory = new PostgresqlConnectionFactory(configuration);
// create proxied connection factory
ConnectionFactory proxyConnectionFactory =
ProxyConnectionFactoryBuilder.create(connectionFactory) // pass original ConnectionFactory
.onAfterMethod(mono -> {
... // callback after method execution
.onEachQueryResult(mono -> {
... // callback for each mapped result
.onAfterQuery(mono -> {
... // callback after query execution
// initialize client with the wrappd connection factory
R2dbc client = new R2dbc(proxyConnectionFactory);
local maven install
./mvnw install
datasource-proxy-r2dbc is developed on following versions.
datasource-proxy-r2dbc | r2dbc-spi | reactor-core |
0.3-SNAPSHOT | 1.0.0.M6 | Californium-SR2 |
0.1, 0.2 | 1.0.0.M6 | Californium-SR2 |
Currently, it is built on milestone release of r2dbc-spi.
To get milestone releases, spring-milestones repo needs to be added.
<name>Spring Milestones</name>
compile "net.ttddyy:datasource-proxy-r2dbc:${latest-version}"
NOTE: artifactId
may change in future.
On after query callback, write out executed query information. This can be done in before query execution; however, some of the attributes are only available at after query execution such as execution time, successfully executed, etc.
, which converts QueryExecutionInfo
to String
, can be used
out of the box to generate log statements.
QueryExecutionInfoFormatter queryExecutionFormatter = QueryExecutionInfoFormatter.showAll();
ConnectionFactory proxyConnectionFactory =
ProxyConnectionFactoryBuilder.create(connectionFactory) // wrap original ConnectionFactory
// on every query execution
.onAfterQuery(execInfo ->
execInfo.map(queryExecutionFormatter::format) // convert QueryExecutionInfo to String
.doOnNext(System.out::println) // print out executed query
On after query execution, check whether the query execution time has exceeded the threshold time, then perform any action.
Duration threshold = Duration.of(...);
ConnectionFactory proxyConnectionFactory =
.onAfterQuery(mono -> mono
.filter(execInfo -> threshold.minus(execInfo.getExecuteDuration()).isNegative())
.doOnNext(execInfo -> {
// slow query logic
TBD for slow query detection while query is executing.
At each invocation of methods, perform action such as printing out the invoked method, create a span, or update metrics.
is used to generate log string.
MethodExecutionInfoFormatter methodExecutionFormatter = MethodExecutionInfoFormatter.withDefault();
ConnectionFactory proxyConnectionFactory =
ProxyConnectionFactoryBuilder.create(connectionFactory) // wrap original ConnectionFactory
// on every method invocation
.onAfterMethod(execInfo ->
execInfo.map(methodExecutionFormatter::format) // convert MethodExecutionInfo to String
.doOnNext(System.out::println) // print out method execution (method tracing)
Client code:
// Simple Transaction Example
.withHandle(handle -> handle
.inTransaction(h -> h.execute("INSERT INTO test VALUES ($1)", 200)))
// converter: from execution info to String
QueryExecutionInfoFormatter queryExecutionFormatter = QueryExecutionInfoFormatter.showAll();
MethodExecutionInfoFormatter methodExecutionFormatter = MethodExecutionInfoFormatter.withDefault();
ConnectionFactory proxyConnectionFactory =
ProxyConnectionFactoryBuilder.create(connectionFactory) // wrap original ConnectionFactory
// on every method invocation
.onAfterMethod(execInfo ->
.doOnNext(System.out::println) // print out method execution (method tracing)
// on every query execution
.onAfterQuery(execInfo ->
.doOnNext(System.out::println) // print out executed query
// pass the proxied ConnectionFactory to client
this.r2dbc = new R2dbc(proxyConnectionFactory);
Method tracing output:
1: Thread:34 Connection:1 Time:16 PostgresqlConnectionFactory#create()
2: Thread:34 Connection:1 Time:0 PostgresqlConnection#createStatement()
3: Thread:34 Connection:1 Time:0 ExtendedQueryPostgresqlStatement#bind()
4: Thread:34 Connection:1 Time:0 ExtendedQueryPostgresqlStatement#add()
5: Thread:34 Connection:1 Time:5 PostgresqlConnection#beginTransaction()
6: Thread:34 Connection:1 Time:5 ExtendedQueryPostgresqlStatement#execute()
7: Thread:34 Connection:1 Time:3 PostgresqlConnection#commitTransaction()
8: Thread:34 Connection:1 Time:4 PostgresqlConnection#close()
Query output: (wrapped for display)
Thread:reactor-tcp-nio-1(30) Connection:1
Transaction:{Create:1 Rollback:0 Commit:0}
Success:True Time:32
Type:Statement BatchSize:0 BindingsSize:1
Query:["INSERT INTO test VALUES ($1)"] Bindings:[(200)]