From 0820e4a8a7050e8c4272a56e8f4553e27943abbf Mon Sep 17 00:00:00 2001 From: Manuel Soulier Date: Thu, 12 Sep 2024 17:12:02 +0200 Subject: [PATCH] feat: parquet support for data retrieval webservice --- .../service/p6export/parquet/ParquetDao.java | 39 ++++- .../fr/insee/arc/utils/dao/UtilitaireDao.java | 17 +- .../insee/arc/utils/files/FileUtilsArc.java | 13 ++ ...ortStep1InitializeClientTablesService.java | 104 ++++++++----- .../ImportStep2GetTableNameService.java | 4 +- .../ImportStep3GetTableDataService.java | 19 ++- .../importServlet/bo/ArcClientIdentifier.java | 24 ++- .../importServlet/bo/ExportFormat.java | 10 +- .../services/importServlet/dao/ClientDao.java | 146 +++++++++++++++--- .../importServlet/dao/ServiceDao.java | 99 +++++++++--- .../provider/DirectoryPathExportWs.java | 25 +++ .../importServlet/bo/ExportFormatTest.java | 7 +- 12 files changed, 403 insertions(+), 104 deletions(-) create mode 100644 arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/provider/DirectoryPathExportWs.java diff --git a/arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java b/arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java index b2af6a238..1960a564d 100644 --- a/arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java +++ b/arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java @@ -45,10 +45,11 @@ public class ParquetDao { ParquetEncryptionKey encryptionKey; + boolean exportIfEmpty; + /** - * Export to parquet - * + * Export to parquet. Empty tables won't be exported. * @param tables * @param outputDirectory * @param encryptionKey @@ -56,8 +57,23 @@ public class ParquetDao { */ public void exportToParquet(List tables, String outputDirectory, ParquetEncryptionKey encryptionKey) throws ArcException { + exportToParquet(tables, outputDirectory, encryptionKey, false); + } + + + /** + * Export to parquet + * @param tables + * @param outputDirectory + * @param encryptionKey + * @param exportOnlyIfNotEmpty + * @throws ArcException + */ + public void exportToParquet(List tables, String outputDirectory, + ParquetEncryptionKey encryptionKey, boolean exportIfEmpty) throws ArcException { this.encryptionKey=encryptionKey; + this.exportIfEmpty=exportIfEmpty; // load duckdb extension loadDuckdb(); @@ -130,7 +146,7 @@ private void exportExecutorTableToParquet(Connection connection, TableToRetrieve query.append("SELECT * FROM " + attachedTableName(connectionIndex, table.getTableName())); } - if (checkNotEmpty(connection, query)) { + if (checkExportCondition(connection, query)) { executeCopy(connection, query, outputFileName); } @@ -153,11 +169,24 @@ private void exportCoordinatorTableToParquet(Connection connection, TableToRetri GenericPreparedStatementBuilder query = new GenericPreparedStatementBuilder(); query.append("SELECT * FROM " + attachedTableName(ArcDatabase.COORDINATOR.getIndex(), table.getTableName())); - if (checkNotEmpty(connection, query)) { + if (checkExportCondition(connection, query)) { executeCopy(connection, query, outputFileName); } } + /** + * check if export must be executed + * @param connection + * @param query + * @return + * @throws SQLException + */ + private boolean checkExportCondition(Connection connection, GenericPreparedStatementBuilder query) throws SQLException + { + return this.exportIfEmpty || checkNotEmpty(connection, query); + } + + /** * check if the table selected by the query is not empty * @param connection @@ -334,7 +363,7 @@ private void loadDuckdb() throws ArcException { * @param outputDirectory * @return */ - protected String exportTablePath(TableToRetrieve table, String outputDirectory) + public static String exportTablePath(TableToRetrieve table, String outputDirectory) { return outputDirectory + File.separator + FormatSQL.extractTableNameToken(table.getTableName()) + PARQUET_FILE_EXTENSION; diff --git a/arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java b/arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java index e2e8325d3..ca330792e 100644 --- a/arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java +++ b/arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java @@ -249,7 +249,22 @@ public int getInt(Connection connexion, GenericPreparedStatementBuilder sql, Mod } return ZERO; } - + + /** + * Exécute une requête qui renvoie exactement un argument de type + * {@link Integer}. + * + * @param connexion la connexion à la base + * @param sql la requête + * @param args les arguments de la requête (optionnels) + * @return + * @throws ArcException + */ + public long getLong(Connection connexion, GenericPreparedStatementBuilder sql, ModeRequete... modes) throws ArcException { + List> returned = executeRequest(connexion, sql, modes); + return Long.parseLong(returned.get(EXECUTE_REQUEST_DATA_START_INDEX).get(0)); + } + /** * Check if a column exists in a table * diff --git a/arc-utils/src/main/java/fr/insee/arc/utils/files/FileUtilsArc.java b/arc-utils/src/main/java/fr/insee/arc/utils/files/FileUtilsArc.java index cb8606125..019b0e5df 100644 --- a/arc-utils/src/main/java/fr/insee/arc/utils/files/FileUtilsArc.java +++ b/arc-utils/src/main/java/fr/insee/arc/utils/files/FileUtilsArc.java @@ -118,6 +118,19 @@ public static void delete(File fileInput) throws ArcException { } } + /** + * Delete a file if it exists + */ + public static void deleteIfExists(File fileInput) throws ArcException { + + if (!fileInput.exists()) + return; + + delete(fileInput); + + } + + /** * Deplacer un fichier d'un repertoire source vers répertoire cible (pas de * slash en fin du nom de repertoire) Si le fichier existe déjà, il est écrasé diff --git a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep1InitializeClientTablesService.java b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep1InitializeClientTablesService.java index 42e531253..407ab1233 100644 --- a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep1InitializeClientTablesService.java +++ b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep1InitializeClientTablesService.java @@ -1,14 +1,19 @@ package fr.insee.arc.ws.services.importServlet; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import fr.insee.arc.core.dataobjects.SchemaEnum; +import fr.insee.arc.core.util.BDParameters; import fr.insee.arc.utils.database.ArcDatabase; +import fr.insee.arc.utils.database.TableToRetrieve; import fr.insee.arc.utils.exception.ArcException; import fr.insee.arc.utils.exception.ArcExceptionMessage; +import fr.insee.arc.utils.utils.LoggerHelper; import fr.insee.arc.ws.services.importServlet.actions.SendResponse; import fr.insee.arc.ws.services.importServlet.bo.ArcClientIdentifier; import fr.insee.arc.ws.services.importServlet.bo.ExportSource; @@ -35,18 +40,40 @@ public ImportStep1InitializeClientTablesService(ArcClientIdentifier arcClientIde } - private void executeIf(ExportSource source, Executable exe) throws ArcException { - if (!arcClientIdentifier.getSource().contains(source)) { + private void executeIfNotMetadataSchema(Executable exe) throws ArcException { + executeIf(!arcClientIdentifier.getEnvironnement().equalsIgnoreCase(SchemaEnum.ARC_METADATA.getSchemaName()), exe); + } + + + private void executeIfSourceDeclared(ExportSource source, Executable exe) throws ArcException { + executeIf(arcClientIdentifier.getSource().contains(source), exe); + } + + private void executeIfParquetDeclared(Executable exe) throws ArcException { + executeIf(arcClientIdentifier.getFormat().isParquet(), exe); + } + + + private void executeIf(Boolean condition, Executable exe) throws ArcException { + if (!condition) { return; } exe.execute(); } public void execute(SendResponse resp) throws ArcException { + + LoggerHelper.info(LOGGER, "Data retrieval webservice invoked for client " + this.arcClientIdentifier.getClientIdentifier() + " on "+ this.arcClientIdentifier.getEnvironnement() + " with timestamp "+ this.arcClientIdentifier.getTimestamp()); - // drop tables from the client that had been requested from a former call - dropPendingClientTables(); + long formerCallTimestamp = clientDao.extractWsTrackingTimestamp(); + if (formerCallTimestamp > 0L) + { + LoggerHelper.info(LOGGER, "CONCURRENT CLIENT CALL WITH TIMESTAMP " + formerCallTimestamp ); + } + // drop tables from the client that had been requested from a former call + this.clientDao.dropPendingClientObjects(); + // create the table that will track the data table which has been built and retrieved createTrackTable(); @@ -55,7 +82,7 @@ public void execute(SendResponse resp) throws ArcException { createWsTables(); // create tables to retrieve family data table - createFamilyMappingTables(); + executeIfNotMetadataSchema(() -> createFamilyMappingTables()); // create data table in an asynchronous parallel thread startTableCreationInParallel(); @@ -74,11 +101,6 @@ public void execute(SendResponse resp) throws ArcException { */ private void createFamilyMappingTables() throws ArcException { - // mapping tables can only be requested on sandbox environments - if (arcClientIdentifier.getEnvironnement().equalsIgnoreCase(SchemaEnum.ARC_METADATA.getSchemaName())) { - return; - } - // check if client is allowed to retrieve family data if (!clientDao.verificationClientFamille()) { throw new ArcException(ArcExceptionMessage.WS_RETRIEVE_DATA_FAMILY_FORBIDDEN); @@ -113,7 +135,10 @@ private void createTrackTable() throws ArcException { * @throws ArcException */ private void createWsTables() throws ArcException { - this.clientDao.createTableWsInfo(); + + List tablesToExport = new ArrayList<>(); + tablesToExport.addAll(this.clientDao.createTableWsInfo()); + executeIfParquetDeclared(() -> exportToParquet(tablesToExport)); } /** @@ -127,15 +152,21 @@ private void startTableCreationInParallel() { @Override public void run() { try { + + List tablesToExport = new ArrayList<>(); + if (tablesMetierNames != null) { - executeIf(ExportSource.MAPPING, () -> createImages(tablesMetierNames)); - executeIf(ExportSource.METADATA, () -> clientDao.createTableMetier()); - executeIf(ExportSource.METADATA, () -> clientDao.createTableVarMetier()); + + executeIfSourceDeclared(ExportSource.MAPPING, () -> tablesToExport.addAll(createImages(tablesMetierNames))); + executeIfSourceDeclared(ExportSource.METADATA, () -> tablesToExport.addAll(clientDao.createTableMetier())); + executeIfSourceDeclared(ExportSource.METADATA, () -> tablesToExport.addAll(clientDao.createTableVarMetier())); } - executeIf(ExportSource.NOMENCLATURE, () -> clientDao.createTableNmcl()); - executeIf(ExportSource.METADATA, () -> clientDao.createTableFamille()); - executeIf(ExportSource.METADATA, () -> clientDao.createTablePeriodicite()); + executeIfSourceDeclared(ExportSource.NOMENCLATURE, () -> tablesToExport.addAll(clientDao.createTableNmcl())); + executeIfSourceDeclared(ExportSource.METADATA, () -> tablesToExport.addAll(clientDao.createTableFamille())); + executeIfSourceDeclared(ExportSource.METADATA, () -> tablesToExport.addAll(clientDao.createTablePeriodicite())); + + executeIfParquetDeclared(() -> exportToParquet(tablesToExport)); } catch (ArcException e) { e.logFullException(); @@ -155,24 +186,6 @@ public void run() { maintenance.start(); } - - /** - * drop tables on coordinator and executors if the exists - * @throws ArcException - */ - private void dropPendingClientTables() throws ArcException { - - this.clientDao.dropPendingClientTables(0); - - int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods(); - for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR - .getIndex() + numberOfExecutorNods; executorConnectionId++) { - this.clientDao.dropPendingClientTables(executorConnectionId); - } - } - - - /** * create image tables on executor nods if connection is scaled, on coordinator * nod if not @@ -180,10 +193,16 @@ private void dropPendingClientTables() throws ArcException { * @param tablesMetierNames * @throws ArcException */ - private void createImages(List tablesMetierNames) throws ArcException { + private List createImages(List tablesMetierNames) throws ArcException { int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods(); + + List tablesToRetrieve = new ArrayList<>(); + if (numberOfExecutorNods == 0) { - clientDao.createImages(tablesMetierNames, ArcDatabase.COORDINATOR.getIndex()); + clientDao.createImages(tablesMetierNames, ArcDatabase.COORDINATOR.getIndex()) + .forEach(t -> tablesToRetrieve.add(new TableToRetrieve(ArcDatabase.COORDINATOR, t))); + + } else { for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR .getIndex() + numberOfExecutorNods; executorConnectionId++) { @@ -192,9 +211,18 @@ private void createImages(List tablesMetierNames) throws ArcException { clientDao.copyTableOfIdSourceToExecutorNod(executorConnectionId); // create the business table containing data of id_source found in table tableOfIdSource - clientDao.createImages(tablesMetierNames, executorConnectionId); + clientDao.createImages(tablesMetierNames, executorConnectionId) + .forEach(t -> tablesToRetrieve.add(new TableToRetrieve(ArcDatabase.EXECUTOR, t))) + ; } } + + return tablesToRetrieve; + } + + private void exportToParquet(List tablesToExport) throws ArcException + { + clientDao.exportToParquet(tablesToExport); } } diff --git a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep2GetTableNameService.java b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep2GetTableNameService.java index e07c8f907..c24619feb 100644 --- a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep2GetTableNameService.java +++ b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep2GetTableNameService.java @@ -17,6 +17,8 @@ public class ImportStep2GetTableNameService { protected static final Logger LOGGER = LogManager.getLogger(ImportStep2GetTableNameService.class); + + private static final String NO_MORE_TABLE_TO_BE_RETRIEVED = " "; private ClientDao clientDao; @@ -80,7 +82,7 @@ public void execute(SendResponse resp) throws ArcException { table = this.clientDao.getAClientTableByType(ExportTrackingType.TRACK); this.clientDao.dropTable(table); - resp.send(" "); + resp.send(NO_MORE_TABLE_TO_BE_RETRIEVED); resp.endSending(); } diff --git a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep3GetTableDataService.java b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep3GetTableDataService.java index 15af98d60..4a10d6a12 100644 --- a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep3GetTableDataService.java +++ b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/ImportStep3GetTableDataService.java @@ -1,16 +1,16 @@ package fr.insee.arc.ws.services.importServlet; +import java.io.IOException; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.json.JSONObject; import fr.insee.arc.utils.database.TableToRetrieve; import fr.insee.arc.utils.exception.ArcException; +import fr.insee.arc.utils.utils.LoggerHelper; import fr.insee.arc.utils.utils.Sleep; import fr.insee.arc.ws.services.importServlet.actions.SendResponse; import fr.insee.arc.ws.services.importServlet.bo.ArcClientIdentifier; -import fr.insee.arc.ws.services.importServlet.bo.ArcClientIdentifierUnsafe; -import fr.insee.arc.ws.services.importServlet.bo.ExportFormat; import fr.insee.arc.ws.services.importServlet.dao.ClientDao; import fr.insee.arc.ws.services.importServlet.dao.ServiceDao; @@ -37,18 +37,23 @@ public ImportStep3GetTableDataService(ArcClientIdentifier arcClientIdentifier) { public void execute(SendResponse resp) throws ArcException { TableToRetrieve table = clientDao.getAClientTableByName(arcClientIdentifier.getClientInputParameter()); - - // binary transfer - ServiceDao.execQueryExportDataToResponse(resp.getWr(), table, ExportFormat.isCsv(this.arcClientIdentifier.getFormat())); + + // transfer data to http response + ServiceDao.execQueryExportDataToResponse(resp.getWr(), table, this.arcClientIdentifier.getFormat(), clientDao); if (this.clientDao.isWebServiceNotPending()) { this.clientDao.dropTable(table); + this.clientDao.deleteParquet(table); this.clientDao.deleteFromTrackTable(table.getTableName()); + + LoggerHelper.info(LOGGER, "Table " + table.getTableName() + " had been retrieved and dropped."); + } else { Sleep.sleep(WAIT_DELAY_ON_PENDING_TABLES_CREATION_IN_MS); } - resp.endSending(); + resp.endSending(); + } } diff --git a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/bo/ArcClientIdentifier.java b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/bo/ArcClientIdentifier.java index e407b4d61..795034996 100644 --- a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/bo/ArcClientIdentifier.java +++ b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/bo/ArcClientIdentifier.java @@ -4,6 +4,7 @@ import java.util.Arrays; import java.util.List; +import fr.insee.arc.core.dataobjects.SchemaEnum; import fr.insee.arc.utils.database.Delimiters; import fr.insee.arc.utils.exception.ArcException; import fr.insee.arc.utils.exception.ArcExceptionMessage; @@ -55,7 +56,8 @@ public ArcClientIdentifier(ArcClientIdentifierUnsafe rawParameters, RemoteHost r this.clientInputParameter = rawParameters.getClientInputParameterUnsafe(); - this.environnement = SecurityDao.validateEnvironnement(rawParameters.getEnvironnementUnsafe()); + validateEnvironnement(); + this.clientIdentifier = WsSecurityDao.validateClientIdentifier(rawParameters.getClientIdentifierUnsafe()); this.timestamp = rawParameters.getTimestampUnsafe(); this.famille = rawParameters.getFamilleUnsafe(); @@ -75,6 +77,26 @@ public ArcClientIdentifier(ArcClientIdentifierUnsafe rawParameters, RemoteHost r } + /** + * validate environment declared + * arc metadata schema or execution sandbox schemas are valid + * @throws ArcException + */ + private void validateEnvironnement() throws ArcException { + + String environnementDeclared = unsafe.getEnvironnementUnsafe(); + + if (environnementDeclared.toLowerCase().equals(SchemaEnum.ARC_METADATA.getSchemaName())) + { + this.environnement = environnementDeclared; + return; + } + + this.environnement = SecurityDao.validateEnvironnement(environnementDeclared); + + } + + private void validateSource() throws ArcException { this.source = new ArrayList<>(); diff --git a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/bo/ExportFormat.java b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/bo/ExportFormat.java index b6fc99767..dc81597dd 100644 --- a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/bo/ExportFormat.java +++ b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/bo/ExportFormat.java @@ -3,13 +3,9 @@ public enum ExportFormat { CSV_GZIP, BINARY, PARQUET; - - public static boolean isCsv(ExportFormat clientFormat) { - return clientFormat.equals(CSV_GZIP); - } - - public static boolean isParquet(ExportFormat clientFormat) { - return clientFormat.equals(PARQUET); + + public boolean isParquet() { + return this.equals(PARQUET); } } diff --git a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ClientDao.java b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ClientDao.java index 32ae28c95..c123dbc6e 100644 --- a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ClientDao.java +++ b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ClientDao.java @@ -1,14 +1,15 @@ package fr.insee.arc.ws.services.importServlet.dao; +import java.io.File; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.json.JSONObject; import fr.insee.arc.core.dataobjects.ArcPreparedStatementBuilder; import fr.insee.arc.core.dataobjects.ColumnEnum; @@ -16,6 +17,7 @@ import fr.insee.arc.core.model.TraitementEtat; import fr.insee.arc.core.model.TraitementPhase; import fr.insee.arc.core.service.global.dao.TableNaming; +import fr.insee.arc.core.service.p6export.parquet.ParquetDao; import fr.insee.arc.utils.dao.CopyObjectsToDatabase; import fr.insee.arc.utils.dao.SQL; import fr.insee.arc.utils.dao.UtilitaireDao; @@ -23,13 +25,15 @@ import fr.insee.arc.utils.database.TableToRetrieve; import fr.insee.arc.utils.exception.ArcException; import fr.insee.arc.utils.exception.ArcExceptionMessage; +import fr.insee.arc.utils.files.FileUtilsArc; +import fr.insee.arc.utils.ressourceUtils.PropertiesHandler; import fr.insee.arc.utils.security.SqlInjectionChecked; import fr.insee.arc.utils.structure.GenericBean; import fr.insee.arc.utils.utils.FormatSQL; import fr.insee.arc.utils.utils.LoggerHelper; import fr.insee.arc.ws.services.importServlet.bo.ArcClientIdentifier; import fr.insee.arc.ws.services.importServlet.bo.ExportTrackingType; -import fr.insee.arc.ws.services.importServlet.bo.JsonKeys; +import fr.insee.arc.ws.services.importServlet.provider.DirectoryPathExportWs; public class ClientDao { @@ -103,7 +107,7 @@ public boolean verificationClientFamille() throws ArcException { public List selectBusinessDataTables() throws ArcException { ArcPreparedStatementBuilder request = new ArcPreparedStatementBuilder(); - request.append("SELECT " + ColumnEnum.NOM_TABLE_METIER + " "); + request.append("SELECT DISTINCT " + ColumnEnum.NOM_TABLE_METIER + " "); request.append("FROM " + ViewEnum.MOD_TABLE_METIER.getFullName(environnement) + " T1 "); request.append("WHERE T1.id_famille='" + this.famille + "' "); request.append(";"); @@ -127,6 +131,9 @@ private void registerTableToBeRetrieved(ExportTrackingType wsTrackingType, ArcDa query.build(SQL.SELECT, query.quoteText(wsTrackingType.toString()), ",", query.quoteText(targetNod.toString()), ",", query.quoteText(nomTable)); UtilitaireDao.get(0).executeRequest(connection, query); + + LoggerHelper.info(LOGGER, "Register table "+ nomTable +" as "+ wsTrackingType.toString() + " on nod "+ targetNod.toString()); + } /** @@ -190,6 +197,20 @@ public void createTableTrackRetrievedTables() throws ArcException { registerTableToBeRetrieved(ExportTrackingType.TRACK, ArcDatabase.COORDINATOR, this.tableWsTracking); } + public long extractWsTrackingTimestamp() throws ArcException { + + String findClientTable = ViewEnum.normalizeTableName(regExpClientTable() + ViewEnum.WS_TRACKING.getTableName()); + + ArcPreparedStatementBuilder requete = new ArcPreparedStatementBuilder(); + requete.append("SELECT coalesce(max(substring(tablename," + requete.quoteText(findClientTable) + ")::bigint),0) as ts FROM pg_tables"); + requete.append(" WHERE tablename ~ " + requete.quoteText(findClientTable)); + requete.append(" AND schemaname = " + requete.quoteText(this.environnement) + " "); + + return UtilitaireDao.get(0).getLong(connection, requete); + + } + + /** * Create the container with all the files name (idSource) that will be retrieve * This query is built around the parameters given in the json request @@ -257,7 +278,6 @@ public void createTableOfIdSource() throws ArcException { */ public List createImages(List tablesMetierNames, int executorConnectionId) throws ArcException { LoggerHelper.debugAsComment(LOGGER, timestamp, "ClientDaoImpl.createImage()"); - List dataTableImages = new ArrayList<>(); for (String tableMetier : tablesMetierNames) { @@ -271,7 +291,7 @@ public List createImages(List tablesMetierNames, int executorCon * Create image of nomenclature tables * @throws ArcException */ - public void createTableNmcl() throws ArcException { + public List createTableNmcl() throws ArcException { LoggerHelper.debugAsComment(LOGGER, "ClientDaoImpl.createNmcl()"); ArcPreparedStatementBuilder requete = new ArcPreparedStatementBuilder(); @@ -281,12 +301,17 @@ public void createTableNmcl() throws ArcException { List> nmclNames = UtilitaireDao.get(0).executeRequestWithoutMetadata(connection, requete); + List tablesToRetrieve = new ArrayList<>(); + for (List nmcl : nmclNames) { String nomTableImage = ViewEnum.getFullNameNotNormalized(environnement, client + "_" + timestamp + "_" + nmcl.get(0)); UtilitaireDao.get(0).executeImmediate(connection, "CREATE TABLE " + nomTableImage + FormatSQL.WITH_NO_VACUUM + " AS SELECT * FROM " + ViewEnum.getFullName(environnement, nmcl.get(0)) + ";"); registerTableToBeRetrieved(ExportTrackingType.DATA, ArcDatabase.COORDINATOR, nomTableImage); + tablesToRetrieve.add(new TableToRetrieve(ArcDatabase.COORDINATOR, nomTableImage)); } + + return tablesToRetrieve; } @@ -294,7 +319,7 @@ public void createTableNmcl() throws ArcException { * Create image of model tables * @throws ArcException */ - public void createTableVarMetier() throws ArcException { + public List createTableVarMetier() throws ArcException { LoggerHelper.debugAsComment(LOGGER, "ClientDaoImpl.createVarMetier()"); String nomTableImage = TableNaming.buildTableNameWithTokens(environnement, ViewEnum.MOD_VARIABLE_METIER, client, @@ -308,6 +333,9 @@ public void createTableVarMetier() throws ArcException { UtilitaireDao.get(0).executeRequest(connection, requete); registerTableToBeRetrieved(ExportTrackingType.DATA, ArcDatabase.COORDINATOR, nomTableImage); + + return Arrays.asList(new TableToRetrieve(ArcDatabase.COORDINATOR, nomTableImage)); + } @@ -317,7 +345,7 @@ public void createTableVarMetier() throws ArcException { * @see fr.insee.arc_essnet.ws.dao.ClientDarcleMetier(java.lang.String, * fr.insee.arc_essnet.ws.actions.Senarc */ - public void createTableMetier() throws ArcException { + public List createTableMetier() throws ArcException { LoggerHelper.debugAsComment(LOGGER, "ClientDaoImpl.sendTableMetier()"); String nomTableImage = TableNaming.buildTableNameWithTokens(environnement, ViewEnum.MOD_TABLE_METIER, client, @@ -331,6 +359,7 @@ public void createTableMetier() throws ArcException { UtilitaireDao.get(0).executeRequest(connection, requete); registerTableToBeRetrieved(ExportTrackingType.DATA, ArcDatabase.COORDINATOR, nomTableImage); + return Arrays.asList(new TableToRetrieve(ArcDatabase.COORDINATOR, nomTableImage)); } /* @@ -339,7 +368,7 @@ public void createTableMetier() throws ArcException { * @see fr.insee.arc_essnet.ws.dao.ClientDarcablesFamilles(long, * java.lang.String) */ - public void createTableFamille() throws ArcException { + public List createTableFamille() throws ArcException { LoggerHelper.debugAsComment(LOGGER, "ClientDaoImpl.createTableFamille()"); String nomTableImage = TableNaming.buildTableNameWithTokens(environnement, ViewEnum.EXT_MOD_FAMILLE, client, @@ -353,6 +382,7 @@ public void createTableFamille() throws ArcException { UtilitaireDao.get(0).executeRequest(connection, requete); registerTableToBeRetrieved(ExportTrackingType.DATA, ArcDatabase.COORDINATOR, nomTableImage); + return Arrays.asList(new TableToRetrieve(ArcDatabase.COORDINATOR, nomTableImage)); } @@ -362,7 +392,7 @@ public void createTableFamille() throws ArcException { * @see fr.insee.arc_essnet.ws.dao.ClientDarcablesFamilles(long, * java.lang.String) */ - public void createTablePeriodicite() throws ArcException { + public List createTablePeriodicite() throws ArcException { LoggerHelper.debugAsComment(LOGGER, "ClientDaoImpl.createTablePeriodicite()"); String nomTableImage = ViewEnum.getFullNameNotNormalized(environnement, @@ -372,7 +402,7 @@ public void createTablePeriodicite() throws ArcException { + " AS SELECT DISTINCT id, val FROM " + ViewEnum.EXT_MOD_PERIODICITE.getFullName() + ";"); registerTableToBeRetrieved(ExportTrackingType.DATA, ArcDatabase.COORDINATOR, nomTableImage); - + return Arrays.asList(new TableToRetrieve(ArcDatabase.COORDINATOR, nomTableImage)); } /** @@ -429,6 +459,8 @@ private void dropTable(int connectionIndex, String clientTable) { public void dropTable(TableToRetrieve table) { + LoggerHelper.info(LOGGER, "Drop table on " + table.getTableName()); + dropTable(ArcDatabase.COORDINATOR.getIndex(), table.getTableName()); int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods(); @@ -442,18 +474,60 @@ public void dropTable(TableToRetrieve table) { } + + /** + * return regexp to check client table name + * important because we do not want to delete wrong tables + * @return + */ + private String regExpClientTable() + { + return client + "_([0123456789]{13,})_.*"; + } + + + /** + * drop objects generated by a client for called + * @throws ArcException + */ + public void dropPendingClientObjects() throws ArcException + { + LoggerHelper.info(LOGGER, "Dropping all client objects for client : "+this.client); + + // drop tables from the client that had been requested from a former call + dropPendingClientTables(); + + // drop parquet files that had been requested from a former call + deletePendingParquet(); + } + + + /** + * drop tables on coordinator and executors if it exists + * @throws ArcException + */ + private void dropPendingClientTables() throws ArcException + { + dropPendingClientTables(0); + + int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods(); + for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR + .getIndex() + numberOfExecutorNods; executorConnectionId++) { + dropPendingClientTables(executorConnectionId); + } + } + + /** * drop table from the client if some already exists * * @throws ArcException */ - public void dropPendingClientTables(int connectionId) throws ArcException { - - String findClientTable = ViewEnum.normalizeTableName(client + "\\_%"); - + private void dropPendingClientTables(int connectionId) throws ArcException { + String findClientTable = ViewEnum.normalizeTableName(regExpClientTable()); ArcPreparedStatementBuilder requete = new ArcPreparedStatementBuilder(); requete.append("SELECT schemaname||'.'||tablename as " + ColumnEnum.TABLE_NAME + " FROM pg_tables"); - requete.append(" WHERE tablename like " + requete.quoteText(findClientTable)); + requete.append(" WHERE tablename ~ " + requete.quoteText(findClientTable)); requete.append(" AND schemaname = " + requete.quoteText(this.environnement)); List tablesToDrop = new GenericBean(UtilitaireDao.get(connectionId).executeRequest(connection, requete)) @@ -468,7 +542,7 @@ public void dropPendingClientTables(int connectionId) throws ArcException { * * @throws ArcException */ - public void createTableWsInfo() throws ArcException { + public List createTableWsInfo() throws ArcException { ArcPreparedStatementBuilder requete = new ArcPreparedStatementBuilder(); requete.append("\n DROP TABLE IF EXISTS " + tableWsInfo + ";"); @@ -483,6 +557,8 @@ public void createTableWsInfo() throws ArcException { UtilitaireDao.get(0).executeImmediate(connection, requete); registerTableToBeRetrieved(ExportTrackingType.DATA, ArcDatabase.COORDINATOR, tableWsInfo); + + return Arrays.asList(new TableToRetrieve(ArcDatabase.COORDINATOR, tableWsInfo)); } @@ -532,10 +608,14 @@ public void copyTableOfIdSourceToExecutorNod(int connectionId) throws ArcExcepti } public void deleteFromTrackTable(String tableName) throws ArcException { + + LoggerHelper.info(LOGGER, "Unregister " + tableName); + ArcPreparedStatementBuilder query = new ArcPreparedStatementBuilder(); query.build(SQL.DELETE, this.tableWsTracking); - query.build(SQL.WHERE, "table_to_retrieve=", query.quoteText(tableName)); - UtilitaireDao.get(0).executeImmediate(connection, query); + query.build(SQL.WHERE, "table_to_retrieve=", query.quoteText(tableName), SQL.END_QUERY); + query.build(SQL.COMMIT, SQL.END_QUERY); + UtilitaireDao.get(0).executeRequest(connection, query); } public long getTimestamp() { @@ -570,6 +650,36 @@ public Connection getConnection() { return connection; } + public String getParquetDirectory() + { + PropertiesHandler properties = PropertiesHandler.getInstance(); + return DirectoryPathExportWs.directoryExport(properties.getBatchParametersDirectory(), arcClientIdentifier.getEnvironnement()); + } + + public void exportToParquet(List mappingTables) throws ArcException { + new ParquetDao().exportToParquet(mappingTables, getParquetDirectory(), null, true); + + } + + public void deleteParquet(TableToRetrieve table) throws ArcException { + FileUtilsArc.deleteIfExists(new File(ParquetDao.exportTablePath(table,getParquetDirectory()))); + } + + private void deletePendingParquet() throws ArcException + { + File parquetDirectory = new File(getParquetDirectory()); + + if (!parquetDirectory.exists()) + return; + + for (File t : parquetDirectory.listFiles()) + { + if (t.getName().matches(regExpClientTable())) + { + FileUtilsArc.deleteIfExists(t); + } + } + } } diff --git a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ServiceDao.java b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ServiceDao.java index ea54dea89..04775ebec 100644 --- a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ServiceDao.java +++ b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ServiceDao.java @@ -1,50 +1,103 @@ package fr.insee.arc.ws.services.importServlet.dao; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.util.zip.GZIPOutputStream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import fr.insee.arc.core.service.p6export.parquet.ParquetDao; import fr.insee.arc.utils.dao.UtilitaireDao; import fr.insee.arc.utils.database.ArcDatabase; import fr.insee.arc.utils.database.TableToRetrieve; import fr.insee.arc.utils.exception.ArcException; import fr.insee.arc.utils.exception.ArcExceptionMessage; +import fr.insee.arc.utils.files.CompressedUtils; +import fr.insee.arc.utils.utils.LoggerHelper; +import fr.insee.arc.ws.services.importServlet.bo.ExportFormat; public class ServiceDao { - public static void execQueryExportDataToResponse(OutputStream os, TableToRetrieve table, boolean csvExportFormat) throws ArcException { + protected static final Logger LOGGER = LogManager.getLogger(ServiceDao.class); + + public static void execQueryExportDataToResponse(OutputStream os, TableToRetrieve table, ExportFormat format, ClientDao clientDao) throws ArcException { + + LoggerHelper.info(LOGGER, "Transfer from " + table.getTableName() + " started"); + LoggerHelper.info(LOGGER, "Data format is " + format); + + switch(format) + { + case BINARY:exportBinary(os, table); break; + case CSV_GZIP:exportCsvGzip(os, table); break; + case PARQUET:exportParquet(os, table, clientDao); break; + } + LoggerHelper.info(LOGGER, "Transfer data from " + table.getTableName() + " ended"); + + } + + private static void exportBinary(OutputStream os, TableToRetrieve table) throws ArcException { + int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods(); + + // binary transfer cannot be scaled + if (numberOfExecutorNods>0) + { + throw new ArcException(ArcExceptionMessage.WS_RETRIEVE_DATA_SCALABLE_TABLE_MUST_BE_EXPORT_IN_CSV); + } + UtilitaireDao.get(0).exporting(null, table.getTableName(), os, false); + } - if (csvExportFormat) + private static void exportCsvGzip(OutputStream os, TableToRetrieve table) throws ArcException { + + int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods(); + + try(GZIPOutputStream goz=new GZIPOutputStream(os);) { - try(GZIPOutputStream goz=new GZIPOutputStream(os);) - { - if (table.getNod().equals(ArcDatabase.EXECUTOR) && numberOfExecutorNods>0) - { - for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR - .getIndex() + numberOfExecutorNods; executorConnectionId++) { - UtilitaireDao.get(executorConnectionId).exporting(null, table.getTableName(), goz, csvExportFormat); - } - } - else - { - UtilitaireDao.get(0).exporting(null, table.getTableName(), goz, csvExportFormat); + if (table.getNod().equals(ArcDatabase.EXECUTOR) && numberOfExecutorNods>0) + { + for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR + .getIndex() + numberOfExecutorNods; executorConnectionId++) { + UtilitaireDao.get(executorConnectionId).exporting(null, table.getTableName(), goz, true); } - } catch (IOException e) { - throw new ArcException(ArcExceptionMessage.STREAM_WRITE_FAILED); } - } - else - { - // binary transfer cannot be scaled - if (numberOfExecutorNods>0) + else { - throw new ArcException(ArcExceptionMessage.WS_RETRIEVE_DATA_SCALABLE_TABLE_MUST_BE_EXPORT_IN_CSV); + UtilitaireDao.get(0).exporting(null, table.getTableName(), goz, true); } - UtilitaireDao.get(0).exporting(null, table.getTableName(), os, csvExportFormat); + } catch (IOException e) { + throw new ArcException(ArcExceptionMessage.STREAM_WRITE_FAILED); + } + } + + + /** + * Export table to retrieve to a parquet file located in the sandbox directory + */ + private static void exportParquet(OutputStream os, TableToRetrieve table, ClientDao clientDao) throws ArcException { + + File fileToTransfer = new File(ParquetDao.exportTablePath(table,clientDao.getParquetDirectory())); + + try (FileInputStream fis = new FileInputStream(fileToTransfer); + BufferedInputStream bis = new BufferedInputStream(fis, CompressedUtils.READ_BUFFER_SIZE);) + { + byte[] buffer = new byte[CompressedUtils.READ_BUFFER_SIZE]; + int len; + while ((len = bis.read(buffer)) != -1) { + os.write(buffer, 0, len); + } + } catch (FileNotFoundException e) { + throw new ArcException(e, ArcExceptionMessage.FILE_READ_FAILED, fileToTransfer); + } catch (IOException e) { + throw new ArcException(e, ArcExceptionMessage.FILE_READ_FAILED, fileToTransfer); } } + } diff --git a/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/provider/DirectoryPathExportWs.java b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/provider/DirectoryPathExportWs.java new file mode 100644 index 000000000..e16728ced --- /dev/null +++ b/arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/provider/DirectoryPathExportWs.java @@ -0,0 +1,25 @@ +package fr.insee.arc.ws.services.importServlet.provider; + +import fr.insee.arc.core.service.global.dao.FileSystemManagement; + +public class DirectoryPathExportWs { + + private DirectoryPathExportWs() { + throw new IllegalStateException("Utility class"); + } + + private static final String WS_EXPORT_ROOT = "EXPORT_WS"; + + /** + * Methods to provide directories paths + * + * @param rootDirectory + * @param env + * @return + */ + + public static String directoryExport(String rootDirectory, String env) { + return FileSystemManagement.directoryRootSubdirectories(rootDirectory, env, WS_EXPORT_ROOT); + } + +} diff --git a/arc-ws/src/test/java/fr/insee/arc/ws/services/importServlet/bo/ExportFormatTest.java b/arc-ws/src/test/java/fr/insee/arc/ws/services/importServlet/bo/ExportFormatTest.java index c478a3f23..5f435ad86 100644 --- a/arc-ws/src/test/java/fr/insee/arc/ws/services/importServlet/bo/ExportFormatTest.java +++ b/arc-ws/src/test/java/fr/insee/arc/ws/services/importServlet/bo/ExportFormatTest.java @@ -7,9 +7,10 @@ public class ExportFormatTest { @Test - public void isCsv() { - assertFalse(ExportFormat.isCsv(ExportFormat.BINARY)); - assertTrue(ExportFormat.isCsv(ExportFormat.CSV_GZIP)); + public void exportFormat_isParquet_test() { + assertTrue(ExportFormat.PARQUET.isParquet()); + assertFalse(ExportFormat.BINARY.isParquet()); + } }