Skip to content

Commit

Permalink
feat: parquet support for data retrieval webservice
Browse files Browse the repository at this point in the history
  • Loading branch information
Nolife999 committed Sep 12, 2024
1 parent 8a8e55c commit 0820e4a
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,35 @@ public class ParquetDao {

ParquetEncryptionKey encryptionKey;

boolean exportIfEmpty;


/**
* Export to parquet
*
* Export to parquet. Empty tables won't be exported.
* @param tables
* @param outputDirectory
* @param encryptionKey
* @throws ArcException
*/
public void exportToParquet(List<TableToRetrieve> tables, String outputDirectory,
ParquetEncryptionKey encryptionKey) throws ArcException {
exportToParquet(tables, outputDirectory, encryptionKey, false);
}


/**
* Export to parquet
* @param tables
* @param outputDirectory
* @param encryptionKey
* @param exportOnlyIfNotEmpty
* @throws ArcException
*/
public void exportToParquet(List<TableToRetrieve> tables, String outputDirectory,
ParquetEncryptionKey encryptionKey, boolean exportIfEmpty) throws ArcException {

this.encryptionKey=encryptionKey;
this.exportIfEmpty=exportIfEmpty;

// load duckdb extension
loadDuckdb();
Expand Down Expand Up @@ -130,7 +146,7 @@ private void exportExecutorTableToParquet(Connection connection, TableToRetrieve
query.append("SELECT * FROM " + attachedTableName(connectionIndex, table.getTableName()));
}

if (checkNotEmpty(connection, query)) {
if (checkExportCondition(connection, query)) {
executeCopy(connection, query, outputFileName);
}

Expand All @@ -153,11 +169,24 @@ private void exportCoordinatorTableToParquet(Connection connection, TableToRetri
GenericPreparedStatementBuilder query = new GenericPreparedStatementBuilder();
query.append("SELECT * FROM " + attachedTableName(ArcDatabase.COORDINATOR.getIndex(), table.getTableName()));

if (checkNotEmpty(connection, query)) {
if (checkExportCondition(connection, query)) {
executeCopy(connection, query, outputFileName);
}
}

/**
* check if export must be executed
* @param connection
* @param query
* @return
* @throws SQLException
*/
private boolean checkExportCondition(Connection connection, GenericPreparedStatementBuilder query) throws SQLException
{
return this.exportIfEmpty || checkNotEmpty(connection, query);
}


/**
* check if the table selected by the query is not empty
* @param connection
Expand Down Expand Up @@ -334,7 +363,7 @@ private void loadDuckdb() throws ArcException {
* @param outputDirectory
* @return
*/
protected String exportTablePath(TableToRetrieve table, String outputDirectory)
public static String exportTablePath(TableToRetrieve table, String outputDirectory)
{
return outputDirectory + File.separator + FormatSQL.extractTableNameToken(table.getTableName())
+ PARQUET_FILE_EXTENSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,22 @@ public int getInt(Connection connexion, GenericPreparedStatementBuilder sql, Mod
}
return ZERO;
}


/**
* Exécute une requête qui renvoie exactement un argument de type
* {@link Integer}.
*
* @param connexion la connexion à la base
* @param sql la requête
* @param args les arguments de la requête (optionnels)
* @return
* @throws ArcException
*/
public long getLong(Connection connexion, GenericPreparedStatementBuilder sql, ModeRequete... modes) throws ArcException {
List<List<String>> returned = executeRequest(connexion, sql, modes);
return Long.parseLong(returned.get(EXECUTE_REQUEST_DATA_START_INDEX).get(0));
}

/**
* Check if a column exists in a table
*
Expand Down
13 changes: 13 additions & 0 deletions arc-utils/src/main/java/fr/insee/arc/utils/files/FileUtilsArc.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ public static void delete(File fileInput) throws ArcException {
}
}

/**
* Delete a file if it exists
*/
public static void deleteIfExists(File fileInput) throws ArcException {

if (!fileInput.exists())
return;

delete(fileInput);

}


/**
* Deplacer un fichier d'un repertoire source vers répertoire cible (pas de
* slash en fin du nom de repertoire) Si le fichier existe déjà, il est écrasé
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package fr.insee.arc.ws.services.importServlet;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import fr.insee.arc.core.dataobjects.SchemaEnum;
import fr.insee.arc.core.util.BDParameters;
import fr.insee.arc.utils.database.ArcDatabase;
import fr.insee.arc.utils.database.TableToRetrieve;
import fr.insee.arc.utils.exception.ArcException;
import fr.insee.arc.utils.exception.ArcExceptionMessage;
import fr.insee.arc.utils.utils.LoggerHelper;
import fr.insee.arc.ws.services.importServlet.actions.SendResponse;
import fr.insee.arc.ws.services.importServlet.bo.ArcClientIdentifier;
import fr.insee.arc.ws.services.importServlet.bo.ExportSource;
Expand All @@ -35,18 +40,40 @@ public ImportStep1InitializeClientTablesService(ArcClientIdentifier arcClientIde

}

private void executeIf(ExportSource source, Executable exe) throws ArcException {
if (!arcClientIdentifier.getSource().contains(source)) {
private void executeIfNotMetadataSchema(Executable exe) throws ArcException {
executeIf(!arcClientIdentifier.getEnvironnement().equalsIgnoreCase(SchemaEnum.ARC_METADATA.getSchemaName()), exe);
}


private void executeIfSourceDeclared(ExportSource source, Executable exe) throws ArcException {
executeIf(arcClientIdentifier.getSource().contains(source), exe);
}

private void executeIfParquetDeclared(Executable exe) throws ArcException {
executeIf(arcClientIdentifier.getFormat().isParquet(), exe);
}


private void executeIf(Boolean condition, Executable exe) throws ArcException {
if (!condition) {
return;
}
exe.execute();
}

public void execute(SendResponse resp) throws ArcException {

LoggerHelper.info(LOGGER, "Data retrieval webservice invoked for client " + this.arcClientIdentifier.getClientIdentifier() + " on "+ this.arcClientIdentifier.getEnvironnement() + " with timestamp "+ this.arcClientIdentifier.getTimestamp());

// drop tables from the client that had been requested from a former call
dropPendingClientTables();
long formerCallTimestamp = clientDao.extractWsTrackingTimestamp();
if (formerCallTimestamp > 0L)
{
LoggerHelper.info(LOGGER, "CONCURRENT CLIENT CALL WITH TIMESTAMP " + formerCallTimestamp );
}

// drop tables from the client that had been requested from a former call
this.clientDao.dropPendingClientObjects();

// create the table that will track the data table which has been built and retrieved
createTrackTable();

Expand All @@ -55,7 +82,7 @@ public void execute(SendResponse resp) throws ArcException {
createWsTables();

// create tables to retrieve family data table
createFamilyMappingTables();
executeIfNotMetadataSchema(() -> createFamilyMappingTables());

// create data table in an asynchronous parallel thread
startTableCreationInParallel();
Expand All @@ -74,11 +101,6 @@ public void execute(SendResponse resp) throws ArcException {
*/
private void createFamilyMappingTables() throws ArcException {

// mapping tables can only be requested on sandbox environments
if (arcClientIdentifier.getEnvironnement().equalsIgnoreCase(SchemaEnum.ARC_METADATA.getSchemaName())) {
return;
}

// check if client is allowed to retrieve family data
if (!clientDao.verificationClientFamille()) {
throw new ArcException(ArcExceptionMessage.WS_RETRIEVE_DATA_FAMILY_FORBIDDEN);
Expand Down Expand Up @@ -113,7 +135,10 @@ private void createTrackTable() throws ArcException {
* @throws ArcException
*/
private void createWsTables() throws ArcException {
this.clientDao.createTableWsInfo();

List<TableToRetrieve> tablesToExport = new ArrayList<>();
tablesToExport.addAll(this.clientDao.createTableWsInfo());
executeIfParquetDeclared(() -> exportToParquet(tablesToExport));
}

/**
Expand All @@ -127,15 +152,21 @@ private void startTableCreationInParallel() {
@Override
public void run() {
try {

List<TableToRetrieve> tablesToExport = new ArrayList<>();

if (tablesMetierNames != null) {
executeIf(ExportSource.MAPPING, () -> createImages(tablesMetierNames));
executeIf(ExportSource.METADATA, () -> clientDao.createTableMetier());
executeIf(ExportSource.METADATA, () -> clientDao.createTableVarMetier());

executeIfSourceDeclared(ExportSource.MAPPING, () -> tablesToExport.addAll(createImages(tablesMetierNames)));
executeIfSourceDeclared(ExportSource.METADATA, () -> tablesToExport.addAll(clientDao.createTableMetier()));
executeIfSourceDeclared(ExportSource.METADATA, () -> tablesToExport.addAll(clientDao.createTableVarMetier()));
}

executeIf(ExportSource.NOMENCLATURE, () -> clientDao.createTableNmcl());
executeIf(ExportSource.METADATA, () -> clientDao.createTableFamille());
executeIf(ExportSource.METADATA, () -> clientDao.createTablePeriodicite());
executeIfSourceDeclared(ExportSource.NOMENCLATURE, () -> tablesToExport.addAll(clientDao.createTableNmcl()));
executeIfSourceDeclared(ExportSource.METADATA, () -> tablesToExport.addAll(clientDao.createTableFamille()));
executeIfSourceDeclared(ExportSource.METADATA, () -> tablesToExport.addAll(clientDao.createTablePeriodicite()));

executeIfParquetDeclared(() -> exportToParquet(tablesToExport));

} catch (ArcException e) {
e.logFullException();
Expand All @@ -155,35 +186,23 @@ public void run() {
maintenance.start();
}


/**
* drop tables on coordinator and executors if the exists
* @throws ArcException
*/
private void dropPendingClientTables() throws ArcException {

this.clientDao.dropPendingClientTables(0);

int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();
for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR
.getIndex() + numberOfExecutorNods; executorConnectionId++) {
this.clientDao.dropPendingClientTables(executorConnectionId);
}
}



/**
* create image tables on executor nods if connection is scaled, on coordinator
* nod if not
*
* @param tablesMetierNames
* @throws ArcException
*/
private void createImages(List<String> tablesMetierNames) throws ArcException {
private List<TableToRetrieve> createImages(List<String> tablesMetierNames) throws ArcException {
int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();

List<TableToRetrieve> tablesToRetrieve = new ArrayList<>();

if (numberOfExecutorNods == 0) {
clientDao.createImages(tablesMetierNames, ArcDatabase.COORDINATOR.getIndex());
clientDao.createImages(tablesMetierNames, ArcDatabase.COORDINATOR.getIndex())
.forEach(t -> tablesToRetrieve.add(new TableToRetrieve(ArcDatabase.COORDINATOR, t)));


} else {
for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR
.getIndex() + numberOfExecutorNods; executorConnectionId++) {
Expand All @@ -192,9 +211,18 @@ private void createImages(List<String> tablesMetierNames) throws ArcException {
clientDao.copyTableOfIdSourceToExecutorNod(executorConnectionId);

// create the business table containing data of id_source found in table tableOfIdSource
clientDao.createImages(tablesMetierNames, executorConnectionId);
clientDao.createImages(tablesMetierNames, executorConnectionId)
.forEach(t -> tablesToRetrieve.add(new TableToRetrieve(ArcDatabase.EXECUTOR, t)))
;
}
}

return tablesToRetrieve;
}

private void exportToParquet(List<TableToRetrieve> tablesToExport) throws ArcException
{
clientDao.exportToParquet(tablesToExport);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
public class ImportStep2GetTableNameService {

protected static final Logger LOGGER = LogManager.getLogger(ImportStep2GetTableNameService.class);

private static final String NO_MORE_TABLE_TO_BE_RETRIEVED = " ";

private ClientDao clientDao;

Expand Down Expand Up @@ -80,7 +82,7 @@ public void execute(SendResponse resp) throws ArcException {
table = this.clientDao.getAClientTableByType(ExportTrackingType.TRACK);
this.clientDao.dropTable(table);

resp.send(" ");
resp.send(NO_MORE_TABLE_TO_BE_RETRIEVED);
resp.endSending();

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package fr.insee.arc.ws.services.importServlet;

import java.io.IOException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;

import fr.insee.arc.utils.database.TableToRetrieve;
import fr.insee.arc.utils.exception.ArcException;
import fr.insee.arc.utils.utils.LoggerHelper;
import fr.insee.arc.utils.utils.Sleep;
import fr.insee.arc.ws.services.importServlet.actions.SendResponse;
import fr.insee.arc.ws.services.importServlet.bo.ArcClientIdentifier;
import fr.insee.arc.ws.services.importServlet.bo.ArcClientIdentifierUnsafe;
import fr.insee.arc.ws.services.importServlet.bo.ExportFormat;
import fr.insee.arc.ws.services.importServlet.dao.ClientDao;
import fr.insee.arc.ws.services.importServlet.dao.ServiceDao;

Expand All @@ -37,18 +37,23 @@ public ImportStep3GetTableDataService(ArcClientIdentifier arcClientIdentifier) {
public void execute(SendResponse resp) throws ArcException {

TableToRetrieve table = clientDao.getAClientTableByName(arcClientIdentifier.getClientInputParameter());
// binary transfer
ServiceDao.execQueryExportDataToResponse(resp.getWr(), table, ExportFormat.isCsv(this.arcClientIdentifier.getFormat()));

// transfer data to http response
ServiceDao.execQueryExportDataToResponse(resp.getWr(), table, this.arcClientIdentifier.getFormat(), clientDao);

if (this.clientDao.isWebServiceNotPending()) {
this.clientDao.dropTable(table);
this.clientDao.deleteParquet(table);
this.clientDao.deleteFromTrackTable(table.getTableName());

LoggerHelper.info(LOGGER, "Table " + table.getTableName() + " had been retrieved and dropped.");

} else {
Sleep.sleep(WAIT_DELAY_ON_PENDING_TABLES_CREATION_IN_MS);
}

resp.endSending();
resp.endSending();

}

}
Loading

0 comments on commit 0820e4a

Please sign in to comment.