Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NativeParameterUtils doesn't support collection with nullable values #845

Closed
chinshaw opened this issue Nov 2, 2023 · 7 comments
Closed
Labels
status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged

Comments

@chinshaw
Copy link

chinshaw commented Nov 2, 2023

We have a case where we are upserting large numbers of daily values into multiple tables. I am going to use a simple example of our timeseries history where we use the collection of tuples to upsert values into the history table.

    @Query(
        """
        INSERT INTO time_series_history (time_series_tag_xid, timestamp, value)
            VALUES :tuples
        ON CONFLICT(time_series_tag_xid, timestamp) 
        DO UPDATE SET 
            value = EXCLUDED.value
        """
    )
    suspend fun upsertReturningCount(tuples: List<Array<Any>>)

I have benchmarked this using large collections and it is around 75% faster than our original batch insert multiple statements, however this only works with non null values.

If you need nulls like suspend fun upsertReturningCount(tuples: List<Array<Any?>>) the NamedParameterUtils class fails to check for null before calling bind. I created a hacky work around but would like advice.

My solution was to override the NamedParamterUtils in my project and add the following bind function.

        private void bind(org.springframework.r2dbc.core.binding.BindTarget target, Iterator<BindMarker> markers,
                          Object valueToBind) {

            Assert.isTrue(markers.hasNext(),
                    () -> String.format(
                            "No bind marker for value [%s] in SQL [%s]. Check that the query was expanded using the same arguments.",
                            valueToBind, toQuery()));

            final BindMarker marker = markers.next();

            // Check to see if the valueToBind is of type NullableParameter
            if (valueToBind instanceof NullableParameter<?> nullableParameter) {
                if (nullableParameter.getValue().isPresent()) {
                    marker.bind(target, nullableParameter.getValue().get());
                } else {
                    marker.bindNull(target, nullableParameter.getCls());
                }
            } else {
                marker.bind(target, valueToBind);
            }
        }

The issue I ran into was that I could not find a way to get the parameter's type so I created a hack class called NullableParameter.

public class NullableParameter<T> {

    private final Class<T> cls;
    private final Optional<T> value;

    public NullableParameter(Class<T> cls, Optional<T> value) {
        this.cls = cls;
        this.value = value;
    }

    public Class<T> getCls() {
        return cls;
    }

    public Optional<T> getValue() {
        return value;
    }

    public static <T> NullableParameter<T> of(Class<T> cls, T value) {
        return new NullableParameter<T>(cls, Optional.of(value));
    }
}

My question: is there any way to check a nullable parameter type? I am assuming that this is not possible but if there was a formal way to pass a nullable parameter to the statement of tuples that would be pretty nice.

Been away from Java for a few years so please don't throw shade on my use of Optionals :)

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Nov 2, 2023
@mp911de
Copy link
Member

mp911de commented Nov 3, 2023

Have you seen io.r2dbc.spi.Parameter and its factory Parameters that allows you to create bind parameter instances carrying the type and value?

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Nov 3, 2023
@chinshaw
Copy link
Author

chinshaw commented Nov 3, 2023

Thanks @mp911de
Yes that is what we previously have been using however performance is significantly slower by a factor of 3x. Here is an example of the first edition of our batching code.

This is the abstract class that all other batching repositories inherit from:

    override fun batchUpsertList(inputFlow: List<T>): Flow<T> {
        return getCurrentAuditor()
            .flatMapMany { auditor ->
                val auditTimestamp = LocalDateTime.now()
                // Create a many connection to execute multiple inserts
                databaseClient.inConnectionMany { conn ->
                    val statement = conn.createStatement(upsertSql())
                    inputFlow.mapIndexed { index, wellInput ->
                        bindStatement(statement, wellInput, auditor, auditTimestamp)
                        if (index < inputFlow.size - 1) {
                            statement.add()
                        }
                    }
                    statement
                        .execute()
                        .toFlux()
                        .flatMap { result ->
                            result.map(rowMapper)
                        }
                        .doFinally {
                            conn.close()
                        }
                }
            }.asFlow()
    }

And here is the code where we are binding parameters. This class is extended via our CoroutineCrudRepository.

interface CasingBatchingRepository : BatchingRepository<CasingEntity>

class CasingBatchingRepositoryImpl(
    auditorAware: ReactiveAuditorAware<String>,
    r2dbcEntityTemplate: R2dbcEntityTemplate
) : AbstractBatchingRepository<CasingEntity>(CasingEntity::class.java, r2dbcEntityTemplate, auditorAware),
    CasingBatchingRepository {

    override fun upsertSql(): String = """
                INSERT INTO casing (wellbore_xid, source_id, top_md, bottom_md, id, od, roughness, run_date, version, created_by, updated_by, created_at, updated_at) 
                    VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
                ON CONFLICT(wellbore_xid, source_id) DO UPDATE SET 
                    wellbore_xid = EXCLUDED.wellbore_xid,
                    source_id = EXCLUDED.source_id,
                    top_md = EXCLUDED.top_md,
                    bottom_md = EXCLUDED.bottom_md,
                    id = EXCLUDED.id,
                    od = EXCLUDED.od,
                    roughness = EXCLUDED.roughness,
                    run_date = EXCLUDED.run_date,
                    version = EXCLUDED.version,
                    updated_by = EXCLUDED.updated_by,
                    updated_at = EXCLUDED.updated_at
                RETURNING *
            """.trimIndent()

    override fun bindStatement(
        statement: Statement,
        casing: CasingEntity,
        auditor: String,
        auditTimestamp: LocalDateTime
    ) {
        statement.bind(0, Parameters.`in`(PostgresqlObjectId.UUID, casing.wellboreXid))
        statement.bind(1, Parameters.`in`(PostgresqlObjectId.VARCHAR, casing.sourceId))
        statement.bind(2, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.topMd))
        statement.bind(3, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.bottomMd))
        statement.bind(4, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.id))
        statement.bind(5, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.od))
        statement.bind(6, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.roughness))
        statement.bind(7, Parameters.`in`(PostgresqlObjectId.TIMESTAMP, casing.runDate))
        statement.bind(8, Parameters.`in`(PostgresqlObjectId.INT8, casing.version))
        statement.bind(9, Parameters.`in`(PostgresqlObjectId.VARCHAR, auditor))
        statement.bind(10, Parameters.`in`(PostgresqlObjectId.VARCHAR, auditor))
        statement.bind(11, Parameters.`in`(PostgresqlObjectId.TIMESTAMP, auditTimestamp))
        statement.bind(12, Parameters.`in`(PostgresqlObjectId.TIMESTAMP, auditTimestamp))
    }
}

In the sample code I found that this code is sending a batch of statements vs the more performant sending a list of tuples with a single statement.

After posting it looks like I should be able to bind the collection of parameters into the single statement. Thoughts?

On a follow up question: When looking at the bindStatement method I am trying to understand how to bind a List<Object[]> parameters.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Nov 3, 2023
@mp911de
Copy link
Member

mp911de commented Nov 3, 2023

To what gets INSERT INTO time_series_history (time_series_tag_xid, timestamp, value) VALUES :tuples rewritten if you provide proper values?

@chinshaw
Copy link
Author

chinshaw commented Nov 3, 2023

To what gets INSERT INTO time_series_history (time_series_tag_xid, timestamp, value) VALUES :tuples rewritten if you provide proper values?

If I understand your question. The timeseries insert works perfectly fine unless there would be a null value parameter. For instance maybe there would be a timestep missing and we wanted to record the value as null ( not a valid scenario but for example purposes). The bind statement will delegate to the public PostgresqlStatement bind(int index, Object value) function which checks for null before invoking the bind. My hack was to check for a null value and have it invoke bindNull rather than bind.

I was looking and it may be possible to create a BindMarkerFactory to handle this more gracefully than modifying the NamedParameterUtils class.

@chinshaw
Copy link
Author

chinshaw commented Nov 3, 2023

I should have started with a simple example test.

create table nullable_test (
    date                        TIMESTAMP WITH TIME ZONE NOT NULL,
    nullable_value              VARCHAR(255),
    PRIMARY KEY (date, nullable_value)
)
@Table("nullable_test")
data class NullableTestEntity(

    // Required well  xid for the entity.
    @Column("date")
    val date: OffsetDateTime,
    @Column("nullable_value")
    val nullableValue: String? = null
)


interface NullableTestRepository : CoroutineCrudRepository<NullableTestEntity, UUID> {
    @Query(
        """
        INSERT INTO nullable_test (date, nullable_value)
                VALUES :tuples
        ON CONFLICT(date, nullable_value) DO UPDATE SET
            date = EXCLUDED.date,
            nullable_value = EXCLUDED.nullable_value
            RETURNING *
        """
    )
    suspend fun upsert(tuples: List<Array<Any?>>): Flow<NullableTestEntity>
}


@OptIn(ExperimentalCoroutinesApi::class)
@ExtendWith(SpringExtension::class)
@Import(ApiTestConfiguration::class)
@DataR2dbcTest(excludeFilters = [ComponentScan.Filter(value = [SecurityConfiguration::class])])
class NullableTupleTest : R2DBCPostgresTestContainer() {


    @Autowired
    private lateinit var nullableTestRepository: NullableTestRepository

    @Test
    fun tesSaveNotNull() = runTest {
        val entities = nullableTestRepository.upsert(
            listOf(
                arrayOf(OffsetDateTime.now(), "value1"),
                arrayOf(OffsetDateTime.now(), "value2")
            )
        )
        entities.onEach(::println)
        entities.collect()
    }

    @Test
    fun tesSaveNull() = runTest {
        val entities = nullableTestRepository.upsert(
            listOf(
                arrayOf(OffsetDateTime.now(), "value1"),
                arrayOf(OffsetDateTime.now(), null)
            )
        )
        entities.onEach(::println)
        entities.collect()
    }
}

In the first test everything works fine but in the second "testSaveNull" I get this exception. This seems to be an issue where bindNull should be invoked rather than bind.

java.lang.IllegalArgumentException: value must not be null
	at io.r2dbc.postgresql.util.Assert.requireNonNull(Assert.java:71)
	at io.r2dbc.postgresql.PostgresqlStatement.bind(PostgresqlStatement.java:104)
	at io.r2dbc.postgresql.PostgresqlStatement.bind(PostgresqlStatement.java:58)
	at org.springframework.r2dbc.core.DefaultDatabaseClient$StatementWrapper.bind(DefaultDatabaseClient.java:544)
	at org.springframework.r2dbc.core.binding.IndexedBindMarkers$IndexedBindMarker.bind(IndexedBindMarkers.java:86)
	at org.springframework.data.r2dbc.core.NamedParameterUtils$ExpandedQuery.bind(NamedParameterUtils.java:543)
	at org.springframework.data.r2dbc.core.NamedParameterUtils$ExpandedQuery.bind(NamedParameterUtils.java:522)
	at org.springframework.data.r2dbc.core.NamedParameterUtils$ExpandedQuery.bindTo(NamedParameterUtils.java:599)
	at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery$ExpandedQuery.bindTo(StringBasedR2dbcQuery.java:226)
	at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$execute$2(DefaultDatabaseClient.java:334)
	at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$execute$3(DefaultDatabaseClient.java:374)
	at org.springframework.r2dbc.core.ConnectionFunction.apply(ConnectionFunction.java:46)
	at org.springframework.r2dbc.core.ConnectionFunction.apply(ConnectionFunction.java:31)
	at org.springframework.r2dbc.core.DefaultFetchSpec.lambda$all$2(DefaultFetchSpec.java:88)
	at org.springframework.r2dbc.core.ConnectionFunction.apply(ConnectionFunction.java:46)

@mp911de
Copy link
Member

mp911de commented Nov 6, 2023

Thanks for the detail.

Have you tried wrapping your nullable values in Parameters.in(…)?

            listOf(
                arrayOf(OffsetDateTime.now(), Parameters.in(R2dbcType.VARCHAR, "value1")),
                arrayOf(OffsetDateTime.now(), Parameters.in(R2dbcType.VARCHAR, null))
            )

@chinshaw
Copy link
Author

chinshaw commented Nov 6, 2023

Ah yes, that works and is a much cleaner solution. Thanks Mark!

@chinshaw chinshaw closed this as completed Nov 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

3 participants