Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reverts the Support to Upsert/Insert Ignore on PDB #471

Merged
merged 12 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test-oracle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ jobs:
name: Test Oracle PDB Java ${{ matrix.java-version }}

strategy:
max-parallel: 1
matrix:
java-version: [ 8, 11, 17 ]
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,17 +365,6 @@ protected AbstractBatch(final DatabaseEngine de, final int batchSize, final long
this(de, null, batchSize, batchTimeout, maxAwaitTimeShutdown);
}

/**
* A functional interface to represent a {@link java.util.function.BiConsumer} that throws an exception.
*
* @param <T> the type of the first argument to the operation.
* @param <R> the type of the second argument to the operation.
*/
@FunctionalInterface
private interface FlushConsumer<T, R> {
void accept(T t, R r) throws DatabaseEngineException;
}

/**
* Starts the timer task.
*/
Expand Down Expand Up @@ -463,24 +452,6 @@ public void add(final String entityName, final EntityEntry ee) throws DatabaseEn
* @implSpec Same as {@link #flush(boolean)} with {@code false}.
*/
public void flush() {
logger.trace("Start batch flushing entries.");
flush(this::processBatch);
}

/**
* Flushes the pending batches ignoring duplicate entries.
*/
public void flushUpsert() {
logger.trace("Start batch flushing upserting duplicated entries.");
flush((this::processBatchUpsert));
}

/**
* Flushes the pending batches given a processing callback function.
*
* @param processBatch A (throwing) BiConsumer to process the batch entries.
*/
private void flush(final FlushConsumer<DatabaseEngine, List<BatchEntry>> processBatch) {
this.metricsListener.onFlushTriggered();
final long flushTriggeredMs = System.currentTimeMillis();
List<BatchEntry> temp;
Expand Down Expand Up @@ -514,7 +485,7 @@ private void flush(final FlushConsumer<DatabaseEngine, List<BatchEntry>> process
this.metricsListener.onFlushStarted(flushTriggeredMs, temp.size());
start = System.currentTimeMillis();

processBatch.accept(de, temp);
processBatch(de, temp);

onFlushFinished(flushTriggeredMs, temp, Collections.emptyList());
logger.trace("[{}] Batch flushed. Took {} ms, {} rows.", name, System.currentTimeMillis() - start, temp.size());
Expand All @@ -539,7 +510,7 @@ private void flush(final FlushConsumer<DatabaseEngine, List<BatchEntry>> process
de.rollback();
}

processBatch.accept(de, temp);
processBatch(de, temp);

success = true;
} catch (final InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,4 @@ protected void processBatch(final DatabaseEngine de, final List<BatchEntry> batc
de.flush();
de.commit();
}

/**
* Processes all batch entries ignoring duplicate entries.
*
* @implSpec Same as {@link #processBatch(DatabaseEngine, List)}}.
*
* @param de The {@link DatabaseEngine} on which to perform the flush.
* @param batchEntries The list of batch entries to be flushed.
* @throws DatabaseEngineException If the operation failed.
*/
protected void processBatchUpsert(final DatabaseEngine de, final List<BatchEntry> batchEntries) throws DatabaseEngineException {
/*
Begin transaction before the addBatch calls, in order to force the retry of the connection if it was lost during
or since the last batch. Otherwise, the addBatch call that uses a prepared statement will fail.
*/
de.beginTransaction();

for (final BatchEntry entry : batchEntries) {
de.addBatchUpsert(entry.getTableName(), entry.getEntityEntry());
}

de.flushUpsert();
de.commit();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,4 @@ public interface PdbBatch extends AutoCloseable {
* @return A void {@link CompletableFuture} that completes when the flush action finishes.
*/
CompletableFuture<Void> flushAsync() throws Exception;

/**
* Flushes the pending batches upserting entries to avoid duplicated key violations.
*
* @throws Exception If an error occurs while flushing.
*/
void flushUpsert() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,6 @@ When done, the future removes itself (if done already, all this can be skipped).
}
}

@Override
public void flushUpsert() {
logger.error("Flush ignoring not available for MultithreadedBatch.");
throw new UnsupportedOperationException("Flushing pending batches upserting duplicated entries is not implemented using multiple threads/connections.");
}

/**
* Flushes the given list batch entries to {@link DatabaseEngine} immediately.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,6 @@ private void closeMappedEntity(final MappedEntity mappedEntity) {
final PreparedStatement insert = mappedEntity.getInsert();
final PreparedStatement insertReturning = mappedEntity.getInsertReturning();
final PreparedStatement insertWithAutoInc = mappedEntity.getInsertWithAutoInc();
final PreparedStatement upsert = mappedEntity.getUpsert();

if (!insert.isClosed()) {
insert.executeBatch();
Expand All @@ -517,10 +516,6 @@ private void closeMappedEntity(final MappedEntity mappedEntity) {
insertWithAutoInc.executeBatch();
}

if (upsert != null && !upsert.isClosed()) {
upsert.executeBatch();
}

} catch (final SQLException e) {
logger.debug(String.format("Failed to flush before closing mapped entity '%s'",
mappedEntity.getEntity().getName()), e);
Expand Down Expand Up @@ -954,29 +949,6 @@ public synchronized void flush() throws DatabaseEngineException {
}
}

/**
* Flushes the batches for all the registered entities, upserting any following .
*
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
@Override
public synchronized void flushUpsert() throws DatabaseEngineException {
/*
* Reconnect on this method does not make sense since a new connection will have nothing to flush.
*/

try {
for (MappedEntity me : entities.values()) {
final PreparedStatement upsert = me.getUpsert();
if (upsert != null) {
upsert.executeBatch();
}
}
} catch (final Exception ex) {
throw getQueryExceptionHandler().handleException(ex, "Something went wrong while flushing");
}
}

/**
* Commits the current transaction. You should only call this method if you've previously called {@link AbstractDatabaseEngine#beginTransaction()}.
*
Expand Down Expand Up @@ -1015,9 +987,6 @@ public synchronized void rollback() throws DatabaseEngineRuntimeException {
mappedEntity.getInsert().clearBatch();
mappedEntity.getInsertReturning().clearBatch();
mappedEntity.getInsertWithAutoInc().clearBatch();
if (mappedEntity.getUpsert() != null) {
mappedEntity.getUpsert().clearBatch();
}
}
conn.rollback();
conn.setAutoCommit(true);
Expand Down Expand Up @@ -1346,31 +1315,6 @@ public synchronized void addBatch(final String name, final EntityEntry entry) th

}

@Override
public synchronized void addBatchUpsert(final String name, final EntityEntry entry) throws DatabaseEngineException {
try {

final MappedEntity me = entities.get(name);

if (me == null) {
throw new DatabaseEngineException(String.format("Unknown entity '%s'", name));
}

PreparedStatement ps = me.getUpsert();

if (ps == null) {
throw new DatabaseEngineException(String.format("Error adding to batch: Entity %s has a null merge/upsert statement.", name));
}

entityToPreparedStatementForBatch(me.getEntity(), ps, entry, true);

ps.addBatch();
} catch (final Exception ex) {
throw new DatabaseEngineException("Error adding to batch", ex);
}

}

/**
* Translates the given entry entity to the prepared statement when used in the context of batch updates.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,6 @@ public interface DatabaseEngine extends AutoCloseable {
*/
void flush() throws DatabaseEngineException;

/**
* Flushes the batches for all the registered entities upserting duplicated entries.
*
* @implNote The default implementation of this method throws an {@link UnsupportedOperationException}
* for backward-compatibility reasons. If this method is supposed to be called, it must be explicitly overridden.
*
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
default void flushUpsert() throws DatabaseEngineException {
throw new UnsupportedOperationException("Method not implemented.");
}

/**
* Commits the current transaction. You should only call this method if you've previously called
* {@link DatabaseEngine#beginTransaction()}.
Expand Down Expand Up @@ -397,21 +385,6 @@ default AbstractBatch createBatch(final int batchSize, final long batchTimeout,
*/
void addBatch(final String name, final EntityEntry entry) throws DatabaseEngineException;

/**
* Adds an entry to the batch upserting duplicate entries.
*
* @param name The entity name.
* @param entry The entry to persist.
*
* @implNote The default implementation of this method throws an {@link UnsupportedOperationException}
* for backward-compatibility reasons. If this method is supposed to be called, it must be explicitly overridden.
*
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
default void addBatchUpsert(final String name, final EntityEntry entry) throws DatabaseEngineException {
throw new UnsupportedOperationException("Method not implemented.");
}

/**
* Executes the given query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public class MappedEntity implements AutoCloseable {
* The prepared statement to insert new values.
*/
private PreparedStatement insertReturning = null;
/**
* The prepared statement to upsert new values to avoid duplicated keys violation.
*/
private PreparedStatement upsert = null;
/**
* The auto increment column if exists;
*/
Expand Down Expand Up @@ -149,35 +145,13 @@ public PreparedStatement getInsertWithAutoInc() {
* Sets the insert statement auto inc columns.
*
* @param insertWithAutoInc The insert statement with auto inc columns.
* @return This mapped entity;
* @return This mapped entity;
* @see DatabaseEngine#persist(String, EntityEntry, boolean)
*/
public MappedEntity setInsertWithAutoInc(final PreparedStatement insertWithAutoInc) {
closeQuietly(this.insertWithAutoInc);
this.insertWithAutoInc = insertWithAutoInc;

return this;
}

/**
* Gets the prepared statement for upsert operation.
*
* @return The upsert statement.
*/
public PreparedStatement getUpsert() {
return upsert;
}

/**
* Sets the upsert statement.
*
* @param upsert The upsert statement
* @return This mapped entity
*/
public MappedEntity setUpsert(final PreparedStatement upsert) {
closeQuietly(this.upsert);
this.upsert = upsert;


return this;
}

Expand Down Expand Up @@ -241,6 +215,5 @@ public void close() throws Exception {
closeQuietly(this.insert);
closeQuietly(this.insertWithAutoInc);
closeQuietly(this.insertReturning);
closeQuietly(this.upsert);
}
}
Loading