Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3721 - Use write lock when reserving incomplete blocks #91

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@
*/
package uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories;

import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;

import javax.persistence.LockModeType;
import java.time.LocalDateTime;
import java.util.List;

@Repository
public interface ContiguousIdBlockRepository extends CrudRepository<ContiguousIdBlock, Long> {
@Query("SELECT cib FROM ContiguousIdBlock cib WHERE cib.categoryId = :categoryId AND cib.lastCommitted != cib.lastValue AND (cib.reserved IS NULL OR cib.reserved IS FALSE) ORDER BY cib.lastValue asc")
// The pessimistic write lock ("select for update" in SQL) ensures that multiple application instances running
// concurrently won't reserve the same incomplete blocks. This will prevent any other application from accessing
// these rows until the transaction is either rolled back or committed (i.e., other applications using this method
// will be blocked).
// Note that application instances reserving the same new blocks is prevented by the uniqueness constraint in the
// database and subsequent retry in the accession generator.
@Lock(LockModeType.PESSIMISTIC_WRITE)
List<ContiguousIdBlock> findUncompletedAndUnreservedBlocksOrderByLastValueAsc(@Param("categoryId") String categoryId);

ContiguousIdBlock findFirstByCategoryIdOrderByLastValueDesc(String categoryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -61,6 +63,8 @@
*/
public class ContiguousIdBlockService {

private static final Logger logger = LoggerFactory.getLogger(ContiguousIdBlockService.class);

private ContiguousIdBlockRepository repository;

private Map<String, BlockParameters> categoryBlockInitializations;
Expand All @@ -76,24 +80,37 @@ public ContiguousIdBlockService(ContiguousIdBlockRepository repository, Map<Stri

@Transactional(isolation = Isolation.SERIALIZABLE)
public void save(Iterable<ContiguousIdBlock> blocks) {
logger.trace("Inside blockService save (multiple)");
// release block if full
blocks.forEach(block -> {if (block.isFull()) {block.releaseReserved();}});
blocks.forEach(block -> {
logger.trace("Block: {}", block);
if (block.isFull()) {
logger.trace("Releasing block");
block.releaseReserved();
}
});
repository.saveAll(blocks);
logger.trace("All blocks saved");
entityManager.flush();
}

@Transactional(isolation = Isolation.SERIALIZABLE)
public void save(ContiguousIdBlock block) {
logger.trace("Inside blockService save (single)");
logger.trace("Block: {}", block);
// release block if full
if (block.isFull()) {
logger.trace("Releasing block");
block.releaseReserved();
}
repository.save(block);
logger.trace("Block saved");
entityManager.flush();
}

@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRES_NEW)
public ContiguousIdBlock reserveNewBlock(String categoryId, String instanceId) {
logger.trace("Inside reserveNewBlock for instanceId {}", instanceId);
ContiguousIdBlock lastBlock = repository.findFirstByCategoryIdOrderByLastValueDesc(categoryId);
BlockParameters blockParameters = getBlockParameters(categoryId);
ContiguousIdBlock reservedBlock;
Expand All @@ -107,6 +124,7 @@ public ContiguousIdBlock reserveNewBlock(String categoryId, String instanceId) {
blockParameters.getBlockSize());
reservedBlock = repository.save(newBlock);
}
logger.trace("Reserved new block: {}", reservedBlock);
entityManager.flush();
return reservedBlock;
}
Expand All @@ -117,8 +135,10 @@ public BlockParameters getBlockParameters(String categoryId) {

@Transactional(isolation = Isolation.SERIALIZABLE)
public List<ContiguousIdBlock> reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(String categoryId, String applicationInstanceId) {
logger.trace("Inside reserveUncompletedBlocks for instanceId {}", applicationInstanceId);
List<ContiguousIdBlock> blockList = repository.findUncompletedAndUnreservedBlocksOrderByLastValueAsc(categoryId);
blockList.stream().forEach(block -> {
logger.trace("Reserving incomplete and unreserved block {}", block);
block.setApplicationInstanceId(applicationInstanceId);
block.markAsReserved();
});
Expand Down
Loading