Skip to content

Commit

Permalink
Reverts the Support to Upsert/Insert Ignore on PDB (#471)
Browse files Browse the repository at this point in the history
Recently, it was added a support for the upsert of entries in the pdb library.

The feature added supported for entities to, when applicable, be able to be upserted or depending on the engine implementation, an insert ignore or even merge. The feature also enabled the upsert feature for batch processing, i.e. entries could now use the new upsert API. However, after its usage it was possible to conclude it decreased performance when client application used this new API. Giving the degrading performance and also the log messages some of the entities generated for not having the upsert statement available, it was decided to remove this feature entirely from the pdb code-base.

---
* Revert "Removes error logging for UPSERT command when creating an entity (#407)"
This reverts commit 94ba1a5.

* Revert "Add Support to Upsert/Insert Ignore on PDB (#392)"
This reverts commit 46f29b2.

* Revert "Add Support to Upsert/Insert Ignore on PDB (#389)"
This reverts commit 963cd30.

* Revert "Add Support to Upsert/Insert Ignore on PDB (#388)"
This reverts commit d4f7c4e.

* Revert "Add Support to Upsert/Insert Ignore on PDB"
This reverts commit d626297.

* Revert "Add Support to Upsert/Insert Ignore on PDB"
This reverts commit 1d32f7f.

* Revert "Add Support to Upsert/Insert Ignore on PDB"
This reverts commit 36a434c.

* Revert "Add Support to Upsert/Insert Ignore on PDB"
This reverts commit 1658965.

* Revert "Add Support to Upsert/Insert Ignore on PDB"
This reverts commit d2f153b.

* Revert "Add Support to Upsert/Insert Ignore on PDB"
This reverts commit 2f0d1ff

* Revert "Add Support to Upsert/Insert Ignore on PDB"
This reverts commit 348a22d

* Revert "Add Support to Upsert/Insert Ignore on PDB"
This reverts commit e100a51
  • Loading branch information
victorcmg-fdz authored Aug 22, 2024
1 parent d4995ce commit 4562805
Show file tree
Hide file tree
Showing 15 changed files with 40 additions and 528 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test-oracle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ jobs:
name: Test Oracle PDB Java ${{ matrix.java-version }}

strategy:
max-parallel: 1
matrix:
java-version: [ 8, 11, 17 ]
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,17 +365,6 @@ protected AbstractBatch(final DatabaseEngine de, final int batchSize, final long
this(de, null, batchSize, batchTimeout, maxAwaitTimeShutdown);
}

/**
* A functional interface to represent a {@link java.util.function.BiConsumer} that throws an exception.
*
* @param <T> the type of the first argument to the operation.
* @param <R> the type of the second argument to the operation.
*/
@FunctionalInterface
private interface FlushConsumer<T, R> {
void accept(T t, R r) throws DatabaseEngineException;
}

/**
* Starts the timer task.
*/
Expand Down Expand Up @@ -463,24 +452,6 @@ public void add(final String entityName, final EntityEntry ee) throws DatabaseEn
* @implSpec Same as {@link #flush(boolean)} with {@code false}.
*/
public void flush() {
logger.trace("Start batch flushing entries.");
flush(this::processBatch);
}

/**
* Flushes the pending batches ignoring duplicate entries.
*/
public void flushUpsert() {
logger.trace("Start batch flushing upserting duplicated entries.");
flush((this::processBatchUpsert));
}

/**
* Flushes the pending batches given a processing callback function.
*
* @param processBatch A (throwing) BiConsumer to process the batch entries.
*/
private void flush(final FlushConsumer<DatabaseEngine, List<BatchEntry>> processBatch) {
this.metricsListener.onFlushTriggered();
final long flushTriggeredMs = System.currentTimeMillis();
List<BatchEntry> temp;
Expand Down Expand Up @@ -514,7 +485,7 @@ private void flush(final FlushConsumer<DatabaseEngine, List<BatchEntry>> process
this.metricsListener.onFlushStarted(flushTriggeredMs, temp.size());
start = System.currentTimeMillis();

processBatch.accept(de, temp);
processBatch(de, temp);

onFlushFinished(flushTriggeredMs, temp, Collections.emptyList());
logger.trace("[{}] Batch flushed. Took {} ms, {} rows.", name, System.currentTimeMillis() - start, temp.size());
Expand All @@ -539,7 +510,7 @@ private void flush(final FlushConsumer<DatabaseEngine, List<BatchEntry>> process
de.rollback();
}

processBatch.accept(de, temp);
processBatch(de, temp);

success = true;
} catch (final InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,4 @@ protected void processBatch(final DatabaseEngine de, final List<BatchEntry> batc
de.flush();
de.commit();
}

/**
* Processes all batch entries ignoring duplicate entries.
*
* @implSpec Same as {@link #processBatch(DatabaseEngine, List)}}.
*
* @param de The {@link DatabaseEngine} on which to perform the flush.
* @param batchEntries The list of batch entries to be flushed.
* @throws DatabaseEngineException If the operation failed.
*/
protected void processBatchUpsert(final DatabaseEngine de, final List<BatchEntry> batchEntries) throws DatabaseEngineException {
/*
Begin transaction before the addBatch calls, in order to force the retry of the connection if it was lost during
or since the last batch. Otherwise, the addBatch call that uses a prepared statement will fail.
*/
de.beginTransaction();

for (final BatchEntry entry : batchEntries) {
de.addBatchUpsert(entry.getTableName(), entry.getEntityEntry());
}

de.flushUpsert();
de.commit();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,4 @@ public interface PdbBatch extends AutoCloseable {
* @return A void {@link CompletableFuture} that completes when the flush action finishes.
*/
CompletableFuture<Void> flushAsync() throws Exception;

/**
* Flushes the pending batches upserting entries to avoid duplicated key violations.
*
* @throws Exception If an error occurs while flushing.
*/
void flushUpsert() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,6 @@ When done, the future removes itself (if done already, all this can be skipped).
}
}

@Override
public void flushUpsert() {
logger.error("Flush ignoring not available for MultithreadedBatch.");
throw new UnsupportedOperationException("Flushing pending batches upserting duplicated entries is not implemented using multiple threads/connections.");
}

/**
* Flushes the given list batch entries to {@link DatabaseEngine} immediately.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,6 @@ private void closeMappedEntity(final MappedEntity mappedEntity) {
final PreparedStatement insert = mappedEntity.getInsert();
final PreparedStatement insertReturning = mappedEntity.getInsertReturning();
final PreparedStatement insertWithAutoInc = mappedEntity.getInsertWithAutoInc();
final PreparedStatement upsert = mappedEntity.getUpsert();

if (!insert.isClosed()) {
insert.executeBatch();
Expand All @@ -517,10 +516,6 @@ private void closeMappedEntity(final MappedEntity mappedEntity) {
insertWithAutoInc.executeBatch();
}

if (upsert != null && !upsert.isClosed()) {
upsert.executeBatch();
}

} catch (final SQLException e) {
logger.debug(String.format("Failed to flush before closing mapped entity '%s'",
mappedEntity.getEntity().getName()), e);
Expand Down Expand Up @@ -954,29 +949,6 @@ public synchronized void flush() throws DatabaseEngineException {
}
}

/**
* Flushes the batches for all the registered entities, upserting any following .
*
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
@Override
public synchronized void flushUpsert() throws DatabaseEngineException {
/*
* Reconnect on this method does not make sense since a new connection will have nothing to flush.
*/

try {
for (MappedEntity me : entities.values()) {
final PreparedStatement upsert = me.getUpsert();
if (upsert != null) {
upsert.executeBatch();
}
}
} catch (final Exception ex) {
throw getQueryExceptionHandler().handleException(ex, "Something went wrong while flushing");
}
}

/**
* Commits the current transaction. You should only call this method if you've previously called {@link AbstractDatabaseEngine#beginTransaction()}.
*
Expand Down Expand Up @@ -1015,9 +987,6 @@ public synchronized void rollback() throws DatabaseEngineRuntimeException {
mappedEntity.getInsert().clearBatch();
mappedEntity.getInsertReturning().clearBatch();
mappedEntity.getInsertWithAutoInc().clearBatch();
if (mappedEntity.getUpsert() != null) {
mappedEntity.getUpsert().clearBatch();
}
}
conn.rollback();
conn.setAutoCommit(true);
Expand Down Expand Up @@ -1346,31 +1315,6 @@ public synchronized void addBatch(final String name, final EntityEntry entry) th

}

@Override
public synchronized void addBatchUpsert(final String name, final EntityEntry entry) throws DatabaseEngineException {
try {

final MappedEntity me = entities.get(name);

if (me == null) {
throw new DatabaseEngineException(String.format("Unknown entity '%s'", name));
}

PreparedStatement ps = me.getUpsert();

if (ps == null) {
throw new DatabaseEngineException(String.format("Error adding to batch: Entity %s has a null merge/upsert statement.", name));
}

entityToPreparedStatementForBatch(me.getEntity(), ps, entry, true);

ps.addBatch();
} catch (final Exception ex) {
throw new DatabaseEngineException("Error adding to batch", ex);
}

}

/**
* Translates the given entry entity to the prepared statement when used in the context of batch updates.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,6 @@ public interface DatabaseEngine extends AutoCloseable {
*/
void flush() throws DatabaseEngineException;

/**
* Flushes the batches for all the registered entities upserting duplicated entries.
*
* @implNote The default implementation of this method throws an {@link UnsupportedOperationException}
* for backward-compatibility reasons. If this method is supposed to be called, it must be explicitly overridden.
*
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
default void flushUpsert() throws DatabaseEngineException {
throw new UnsupportedOperationException("Method not implemented.");
}

/**
* Commits the current transaction. You should only call this method if you've previously called
* {@link DatabaseEngine#beginTransaction()}.
Expand Down Expand Up @@ -397,21 +385,6 @@ default AbstractBatch createBatch(final int batchSize, final long batchTimeout,
*/
void addBatch(final String name, final EntityEntry entry) throws DatabaseEngineException;

/**
* Adds an entry to the batch upserting duplicate entries.
*
* @param name The entity name.
* @param entry The entry to persist.
*
* @implNote The default implementation of this method throws an {@link UnsupportedOperationException}
* for backward-compatibility reasons. If this method is supposed to be called, it must be explicitly overridden.
*
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
default void addBatchUpsert(final String name, final EntityEntry entry) throws DatabaseEngineException {
throw new UnsupportedOperationException("Method not implemented.");
}

/**
* Executes the given query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public class MappedEntity implements AutoCloseable {
* The prepared statement to insert new values.
*/
private PreparedStatement insertReturning = null;
/**
* The prepared statement to upsert new values to avoid duplicated keys violation.
*/
private PreparedStatement upsert = null;
/**
* The auto increment column if exists;
*/
Expand Down Expand Up @@ -149,35 +145,13 @@ public PreparedStatement getInsertWithAutoInc() {
* Sets the insert statement auto inc columns.
*
* @param insertWithAutoInc The insert statement with auto inc columns.
* @return This mapped entity;
* @return This mapped entity;
* @see DatabaseEngine#persist(String, EntityEntry, boolean)
*/
public MappedEntity setInsertWithAutoInc(final PreparedStatement insertWithAutoInc) {
closeQuietly(this.insertWithAutoInc);
this.insertWithAutoInc = insertWithAutoInc;

return this;
}

/**
* Gets the prepared statement for upsert operation.
*
* @return The upsert statement.
*/
public PreparedStatement getUpsert() {
return upsert;
}

/**
* Sets the upsert statement.
*
* @param upsert The upsert statement
* @return This mapped entity
*/
public MappedEntity setUpsert(final PreparedStatement upsert) {
closeQuietly(this.upsert);
this.upsert = upsert;


return this;
}

Expand Down Expand Up @@ -241,6 +215,5 @@ public void close() throws Exception {
closeQuietly(this.insert);
closeQuietly(this.insertWithAutoInc);
closeQuietly(this.insertReturning);
closeQuietly(this.upsert);
}
}
Loading

0 comments on commit 4562805

Please sign in to comment.