Skip to content

Commit

Permalink
feat: version 94.1.10
Browse files Browse the repository at this point in the history
  • Loading branch information
Nolife999 committed Mar 27, 2024
1 parent 8c28750 commit 145d84a
Show file tree
Hide file tree
Showing 35 changed files with 361 additions and 72 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

All notable changes to this project will be documented in this file.

## version-94.1.10
- bug fix for gz archive entry name
- parquet export

## version-94.1.9
- bug fix with filter gui

Expand Down
5 changes: 3 additions & 2 deletions arc-batch/src/main/java/fr/insee/arc/batch/BatchARC.java
Original file line number Diff line number Diff line change
Expand Up @@ -475,13 +475,14 @@ private void initializeBatchLoop(List<TraitementPhase> phases,

// initialiser le tableau de phase
int startingPhase = TraitementPhase.CHARGEMENT.getOrdre();
int endPhase = TraitementPhase.MAPPING.getOrdre();

for (TraitementPhase phase : TraitementPhase.values()) {
if (phase.getOrdre() >= startingPhase) {
if (phase.getOrdre() >= startingPhase && phase.getOrdre() <= endPhase) {
phases.add(phase.getOrdre() - startingPhase, phase);
}
}

// initialiser le pool de thread
for (TraitementPhase phase : phases) {
pool.put(phase, new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package fr.insee.arc.core.factory;

import fr.insee.arc.core.model.TraitementPhase;
import fr.insee.arc.core.service.global.ApiService;
import fr.insee.arc.core.service.p6export.ApiExportService;

public class ApiExportServiceFactory implements IServiceFactory {

@Override
public ApiService get(TraitementPhase phaseService, String executionSchema, Integer capacityParameter, String paramBatch) {
return new ApiExportService(phaseService, executionSchema, capacityParameter, paramBatch);
}

public static IServiceFactory getInstance() {
return new ApiExportServiceFactory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ private ApiServiceFactory(Map<TraitementPhase, IServiceFactory> aMap) {
ApiControleServiceFactory.getInstance());
this.map.put(TraitementPhase.MAPPING,
ApiMappingServiceFactory.getInstance());
this.map.put(TraitementPhase.EXPORT,
ApiExportServiceFactory.getInstance());
}

private static final ApiServiceFactory getInstance() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package fr.insee.arc.core.model;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public enum TraitementPhase {
DUMMY(-1, 1), INITIALISATION(0, 1000000), RECEPTION(1, 1000000), CHARGEMENT(2, 1000000), NORMAGE(3, 1000000)
, CONTROLE(4, 1000000), MAPPING(5, 1000000);
, CONTROLE(4, 1000000), MAPPING(5, 1000000), EXPORT(6,1);
private TraitementPhase(int anOrdre, int aNbLigneATraiter) {
this.ordre = anOrdre;
this.nbLigneATraiter = aNbLigneATraiter;
Expand Down Expand Up @@ -80,18 +81,26 @@ public TraitementPhase previousPhase() {
return phase;
}

public static List<TraitementPhase> listPhasesAfterPhase(TraitementPhase phase) {
public static List<TraitementPhase> listPhasesBetween(TraitementPhase phase, TraitementPhase phaseEnd) {
List<TraitementPhase> listePhaseC = new ArrayList<>();
for (TraitementPhase t : values()) {
if (t.getOrdre()>=phase.getOrdre()) {
if (t.getOrdre()>=phase.getOrdre() && t.getOrdre()<=phaseEnd.getOrdre()) {
listePhaseC.add(t);
}
}
return listePhaseC;
}

public static List<TraitementPhase> getListPhaseC() {
return listPhasesAfterPhase(TraitementPhase.INITIALISATION);
public static List<TraitementPhase> getListPhaseExecutableInBas() {
return listPhasesBetween(TraitementPhase.INITIALISATION, TraitementPhase.MAPPING);
}

public static List<TraitementPhase> getListPhaseReportedInBas() {
return listPhasesBetween(TraitementPhase.RECEPTION, TraitementPhase.MAPPING);
}

public static List<TraitementPhase> getListPhaseAlwaysTodo() {
return Arrays.asList(TraitementPhase.INITIALISATION, TraitementPhase.RECEPTION, TraitementPhase.EXPORT);
}

public String tableRegleOfPhaseInSandbox() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,7 @@ public ServiceReporting invokeApi() {

LoggerHelper.info(LOGGER_APISERVICE, "****** Execution " + this.getCurrentPhase() + " *******");
try {

if (this.getCurrentPhase().equals(TraitementPhase.INITIALISATION)
|| this.getCurrentPhase().equals(TraitementPhase.RECEPTION)) {
if (TraitementPhase.getListPhaseAlwaysTodo().contains(this.getCurrentPhase())) {
this.todo = true;
} else {
this.todo = checkTodo(this.getTablePil(), this.getPreviousPhase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,17 @@ public static boolean isEnvSetForProduction(String env) {
return !found.isEmpty();
}

public static String isEnvSetForBatch(String env) {
JSONArray j=new JSONArray(new BDParameters(ArcDatabase.COORDINATOR).getString(null, "ArcAction.batchMode", "[]"));
Set<String> found=new HashSet<>();

j.forEach(item -> {
if (item.toString().equals(env))
{
found.add(item.toString());
}
});
return (found.isEmpty() ? null : "1");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ private FileSystemManagement() {
/**
* Directory management
*/
private static final String DIRECTORY_EXPORT_QUALIFIIER = "EXPORT";

private static final String DIRECTORY_TOKEN = "_";

private static final String DIRECTORY_ARCHIVE_QUALIFIIER = "ARCHIVE";
Expand All @@ -37,10 +35,17 @@ public static String directoryPhaseRoot(String rootDirectory, String env, Traite
return directoryEnvRoot(rootDirectory, env) + File.separator + t.toString();
}

public static String directoryEnvExport(String rootDirectory, String env) {
return directoryEnvRoot(rootDirectory, env) + File.separator + DIRECTORY_EXPORT_QUALIFIIER;
public static String directoryPhaseRootSubdirectories(String rootDirectory, String env, TraitementPhase t, String...subdirectories) {

StringBuilder directoryPath = new StringBuilder();
directoryPath.append(directoryEnvRoot(rootDirectory, env) + File.separator + t.toString());
for (String subdirectory:subdirectories)
{
directoryPath.append(File.separator + subdirectory);
}
return directoryPath.toString();
}

public static String directoryPhaseEntrepot(String rootDirectory, String env, TraitementPhase t, String entrepot) {
return directoryPhaseRoot(rootDirectory, env, t) + DIRECTORY_TOKEN + entrepot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void clearPilotageAndDirectories(String repertoire) throws ArcException {
FileUtilsArc.deleteAndRecreateDirectory(
Paths.get(DirectoryPath.directoryReceptionEtatKO(repertoire, envExecution)).toFile());
FileUtilsArc.deleteAndRecreateDirectory(
Paths.get(FileSystemManagement.directoryEnvExport(repertoire, envExecution)).toFile());
Paths.get(FileSystemManagement.directoryPhaseRoot(repertoire, envExecution, TraitementPhase.EXPORT)).toFile());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
import fr.insee.arc.core.service.p1reception.registerarchive.bo.FileDescriber;
import fr.insee.arc.core.service.p1reception.registerarchive.bo.FilesDescriber;
import fr.insee.arc.core.service.p1reception.registerarchive.bo.IArchiveStream;
import fr.insee.arc.utils.exception.ArcException;
import fr.insee.arc.utils.exception.ArcExceptionMessage;
import fr.insee.arc.utils.files.CompressionExtension;

/**
* Class to check archive
*
* @author FY2QEQ
*
*/
Expand All @@ -21,10 +25,9 @@ public class ArchiveCheckOperation {
private int erreur;
private TraitementEtat etat;
private String rapport;

IArchiveStream archiveStream;
Entry currentEntry;


public ArchiveCheckOperation(IArchiveStream archiveStream) {
super();
Expand All @@ -33,54 +36,55 @@ public ArchiveCheckOperation(IArchiveStream archiveStream) {

/**
* Check every file in the tgz archive and returns the archive content.
* @throws ArcException
*/
public FilesDescriber checkArchive(File f, String entrepot) {
public FilesDescriber checkArchive(File f, String entrepot) throws ArcException {
// Inscription des fichiers au contenu de l'archive

FilesDescriber contentTemp = new FilesDescriber();

setStatus(0, null, null);

// Check if the archive is fully readable
try
{
try {
archiveStream.startInputStream(f);
// default case if archive is empty of real files
setStatus(1, TraitementEtat.KO, TraitementRapport.INITIALISATION_CORRUPTED_ARCHIVE.toString());

this.currentEntry = archiveStream.getEntry();
this.currentEntry = archiveStream.getEntry();

// Check every entry
while (currentEntry != null) {

if (currentEntry.isDirectory()) {
currentEntry = archiveStream.getEntry();
currentEntry = archiveStream.getEntry();
} else {
setStatus(0, TraitementEtat.OK, null);

String name = currentEntry.getName();


// prefix entry name with entrepot
String entryNamePrefixedWithEntrepot = addEntrepotPrefixToEntryName(entrepot, f.getName(), name);

validateEntry();

contentTemp.add(new FileDescriber(f.getName(), entrepot + name,
TraitementTypeFichier.DA, etat, rapport, null));
contentTemp.add(new FileDescriber(f.getName(), entryNamePrefixedWithEntrepot , TraitementTypeFichier.DA, etat,
rapport, null));

rapport = null;
}
}
} catch (IOException e1) {
erreur = 1;
rapport = TraitementRapport.INITIALISATION_CORRUPTED_ARCHIVE.toString();
}
finally
{
} finally {
archiveStream.close();
}

// Inscription de l'archive
contentTemp.add(new FileDescriber(f.getName(), null,
erreur == 1 ? TraitementTypeFichier.AC : TraitementTypeFichier.A,
erreur > 0 ? TraitementEtat.KO : TraitementEtat.OK, rapport, null));
contentTemp.add(
new FileDescriber(f.getName(), null, erreur == 1 ? TraitementTypeFichier.AC : TraitementTypeFichier.A,
erreur > 0 ? TraitementEtat.KO : TraitementEtat.OK, rapport, null));

// If there is any error, all files are marked KO with a special report
if (erreur > 0) {
Expand All @@ -94,6 +98,37 @@ public FilesDescriber checkArchive(File f, String entrepot) {
return contentTemp;
}

/**
* Add entrepot to entry name if required GZ entry get the name of original
* archive name so entrepot musn't be added to these ones as it is already in
* the archive name
* For other archive type, entries name are not dependent from
* archive name so entrepot must be prefixed to them
*
* @param entrepot
* @param name
* @param name2
* @return
* @throws ArcException
*/
protected static String addEntrepotPrefixToEntryName(String entrepot, String archiveName, String entryName) throws ArcException {

if (archiveName.endsWith(CompressionExtension.TAR_GZ.getFileExtension())
|| archiveName.endsWith(CompressionExtension.TGZ.getFileExtension())) {
return entrepot + entryName;
}

if (archiveName.endsWith(CompressionExtension.ZIP.getFileExtension())) {
return entrepot + entryName;
}

if (archiveName.endsWith(CompressionExtension.GZ.getFileExtension())) {
return entryName;
}

throw new ArcException(ArcExceptionMessage.INVALID_FILE_FORMAT);
}

private void setStatus(int erreur, TraitementEtat etat, String rapport) {
this.erreur = erreur;
this.etat = etat;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package fr.insee.arc.core.service.p6export;

import java.util.ArrayList;
import java.util.List;

import fr.insee.arc.core.dataobjects.ArcPreparedStatementBuilder;
import fr.insee.arc.core.dataobjects.ColumnEnum;
import fr.insee.arc.core.dataobjects.ViewEnum;
import fr.insee.arc.core.model.TraitementPhase;
import fr.insee.arc.core.service.global.ApiService;
import fr.insee.arc.core.service.p6export.parquet.ParquetDao;
import fr.insee.arc.core.service.p6export.parquet.ParquetEncryptionKey;
import fr.insee.arc.core.service.p6export.provider.DirectoryPathExport;
import fr.insee.arc.utils.dao.SQL;
import fr.insee.arc.utils.dao.UtilitaireDao;
import fr.insee.arc.utils.database.ArcDatabase;
import fr.insee.arc.utils.database.TableToRetrieve;
import fr.insee.arc.utils.exception.ArcException;
import fr.insee.arc.utils.ressourceUtils.PropertiesHandler;
import fr.insee.arc.utils.structure.GenericBean;

public class ApiExportService extends ApiService {

Expand All @@ -23,7 +33,31 @@ public ApiExportService(TraitementPhase aCurrentPhase, String anEnvironnementExe

@Override
public void executer() throws ArcException {
// empty for now

// get timestamp
ArcPreparedStatementBuilder query;
query = new ArcPreparedStatementBuilder();
query.build(SQL.SELECT, "to_char(localtimestamp,'YYYY_MM_DD_HH24_MI_SS_MS')");
String dateExport = UtilitaireDao.get(0).getString(this.connexion.getCoordinatorConnection(), query);


// select table to be exported
query = new ArcPreparedStatementBuilder();
query.build(SQL.SELECT, ColumnEnum.NOM_TABLE_METIER, SQL.FROM, ViewEnum.IHM_MOD_TABLE_METIER.getFullName());

List<String> mappingTablesName = new GenericBean(UtilitaireDao.get(0).executeRequest(this.connexion.getCoordinatorConnection(), query))
.getColumnValues(ColumnEnum.NOM_TABLE_METIER.getColumnName());

List<TableToRetrieve> tablesToExport = new ArrayList<>();
// business mapping table are found on executors nod
mappingTablesName.stream().forEach(t -> tablesToExport.add(new TableToRetrieve(ArcDatabase.EXECUTOR , ViewEnum.getFullName(this.envExecution, t))));


PropertiesHandler properties = PropertiesHandler.getInstance();

exportToParquet(tablesToExport, DirectoryPathExport.directoryExport(properties.getBatchParametersDirectory(), this.envExecution, dateExport), null);


}

protected void exportToParquet(List<TableToRetrieve> tables, String outputDirectory,
Expand Down
Loading

0 comments on commit 145d84a

Please sign in to comment.