diff --git a/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts b/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts index bafe11998..e86ca2608 100644 --- a/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts +++ b/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts @@ -27,6 +27,8 @@ import { GeoRegionsService } from 'modules/geo-regions/geo-regions.service'; import { GeoCodingAbstractClass } from 'modules/geo-coding/geo-coding-abstract-class'; import { MissingH3DataError } from 'modules/indicator-records/errors/missing-h3-data.error'; import { TasksService } from 'modules/tasks/tasks.service'; +import { TASK_STEP } from 'modules/tasks/task.entity'; +import { IndicatorRecord } from 'modules/indicator-records/indicator-record.entity'; export interface LocationData { locationAddressInput?: string; @@ -77,6 +79,10 @@ export class SourcingDataImportService { this.logger.log(`Starting import process`); await this.fileService.isFilePresentInFs(filePath); try { + await this.tasksService.updateImportJobEvent({ + taskId, + currentStep: TASK_STEP.SAVING_MSB, + }); const parsedXLSXDataset: SourcingRecordsSheets = await this.fileService.transformToJson(filePath, SHEETS_MAP); @@ -114,6 +120,11 @@ export class SourcingDataImportService { materials, dtoMatchedData.sourcingData, ); + + await this.tasksService.updateImportJobEvent({ + taskId, + currentStep: TASK_STEP.GEOLOCATING, + }); // TODO: TBD What to do when there is some location where we cannot determine its admin-region: i.e coordinates // in the middle of the sea const geoCodedSourcingData: SourcingData[] = @@ -138,7 +149,20 @@ export class SourcingDataImportService { // Getting H3 data for calculations is done within DB so we need to improve the error handling // TBD: What to do when there is no H3 for a Material try { - await this.indicatorRecordsService.createIndicatorRecordsForAllSourcingRecords(); + await this.tasksService.updateImportJobEvent({ + taskId, + currentStep: TASK_STEP.CALCULATING_IMPACT, + }); + const indicatorRecords: IndicatorRecord[] = + await this.indicatorRecordsService.createIndicatorRecordsForAllSourcingRecords(); + + await this.tasksService.updateImportJobEvent({ + taskId, + currentStep: TASK_STEP.SAVING_IMPACT, + }); + await this.indicatorRecordsService.saveIndicatorRecordsForAllSourcingRecords( + indicatorRecords, + ); this.logger.log('Indicator Records generated'); } catch (err: any) { if (err instanceof MissingH3DataError) { @@ -150,6 +174,10 @@ export class SourcingDataImportService { } } finally { await this.fileService.deleteDataFromFS(filePath); + await this.tasksService.updateImportJobEvent({ + taskId, + currentStep: TASK_STEP.DONE, + }); } } diff --git a/api/src/modules/indicator-records/indicator-records.service.ts b/api/src/modules/indicator-records/indicator-records.service.ts index d2f2f9a34..def682d2a 100644 --- a/api/src/modules/indicator-records/indicator-records.service.ts +++ b/api/src/modules/indicator-records/indicator-records.service.ts @@ -145,7 +145,9 @@ export class IndicatorRecordsService extends AppBaseService< * @description Creates Indicator Records from all existing Sourcing Records in the DB */ // TODO still not adapted to modular indicators, because the performance gets hit drastically on the source import. Pending to be worked on - async createIndicatorRecordsForAllSourcingRecords(): Promise { + async createIndicatorRecordsForAllSourcingRecords(): Promise< + IndicatorRecord[] + > { const rawData: SourcingRecordsWithIndicatorRawDataDto[] = await this.indicatorRecordRepository.getIndicatorRawDataForAllSourcingRecords(); const calculatedData: IndicatorRecordCalculatedValuesDto[] = rawData.map( @@ -209,6 +211,15 @@ export class IndicatorRecordsService extends AppBaseService< ); }, ); + return indicatorRecords; + } + + /** + * @description Save Indicator Records + */ + async saveIndicatorRecordsForAllSourcingRecords( + indicatorRecords: IndicatorRecord[], + ): Promise { await this.indicatorRecordRepository.saveChunks(indicatorRecords); } diff --git a/api/src/modules/tasks/task.entity.ts b/api/src/modules/tasks/task.entity.ts index b33a84d6d..9b2bcd663 100644 --- a/api/src/modules/tasks/task.entity.ts +++ b/api/src/modules/tasks/task.entity.ts @@ -20,6 +20,24 @@ export enum TASK_STATUS { FAILED = 'failed', } +export enum TASK_STEP { + UNKNOWN = 'unknown', + SAVING_MSB = 'saving_msb', // Materials, Suppliers, Business-Units + GEOLOCATING = 'geolocating', + CALCULATING_IMPACT = 'calculating_impact', + SAVING_IMPACT = 'saving_impact', + DONE = 'done', +} + +export const stepToPercentage: { [key in TASK_STEP]: number } = { + unknown: 0.0, + saving_msb: 0.0, + geolocating: 25.0, + calculating_impact: 50.0, + saving_impact: 75.0, + done: 100.0, +}; + export enum TASK_TYPE { UNKNOWN = 'unknown', SOURCING_DATA_IMPORT = 'sourcing_data_import', @@ -47,6 +65,14 @@ export class Task extends TimestampedBaseEntity { @Column({ type: 'enum', enum: TASK_STATUS, default: TASK_STATUS.PROCESSING }) status!: TASK_STATUS; + @ApiProperty() + @Column({ type: 'enum', enum: TASK_STEP, default: TASK_STEP.UNKNOWN }) + currentStep: TASK_STEP; + + @ApiProperty() + @Column({ type: 'decimal', nullable: false, default: 0.0 }) + progress: number; + @ApiProperty() @Column({ type: 'json' }) data!: Record; diff --git a/api/src/modules/tasks/tasks.service.ts b/api/src/modules/tasks/tasks.service.ts index ba239e563..4dca7058a 100644 --- a/api/src/modules/tasks/tasks.service.ts +++ b/api/src/modules/tasks/tasks.service.ts @@ -4,6 +4,8 @@ import { TASK_TYPE, Task, taskResource, + TASK_STEP, + stepToPercentage, } from 'modules/tasks/task.entity'; import { AppBaseService, @@ -43,6 +45,8 @@ export class TasksService extends AppBaseService< 'errors', 'logs', 'userId', + 'progress', + 'currentStep', ], keyForAttribute: 'camelCase', }; @@ -76,6 +80,7 @@ export class TasksService extends AppBaseService< newData?: Record; newErrors?: Error; newLogs?: string[]; + currentStep?: TASK_STEP; }): Promise { /** * @debt @@ -85,7 +90,8 @@ export class TasksService extends AppBaseService< * @todo: Make this work nicely in distributed systems. * */ - const { taskId, newStatus, newData, newErrors, newLogs } = updateTask; + const { taskId, newStatus, newData, newErrors, newLogs, currentStep } = + updateTask; const task: Task | undefined = await this.taskRepository.findOne(taskId); if (!task) { throw new NotFoundException(`Could not found Task with ID: ${taskId}`); @@ -108,6 +114,11 @@ export class TasksService extends AppBaseService< ); } + if (currentStep) { + task.currentStep = currentStep; + task.progress = stepToPercentage[currentStep]; + } + if (newStatus) task.status = newStatus; return task.save(); } diff --git a/api/test/e2e/tasks/tasks.spec.ts b/api/test/e2e/tasks/tasks.spec.ts index a9831d2bd..eb1180b13 100644 --- a/api/test/e2e/tasks/tasks.spec.ts +++ b/api/test/e2e/tasks/tasks.spec.ts @@ -2,7 +2,12 @@ import { Test, TestingModule } from '@nestjs/testing'; import { HttpStatus, INestApplication, ValidationPipe } from '@nestjs/common'; import * as request from 'supertest'; import { AppModule } from 'app.module'; -import { Task, TASK_STATUS, TASK_TYPE } from 'modules/tasks/task.entity'; +import { + Task, + TASK_STATUS, + TASK_STEP, + TASK_TYPE, +} from 'modules/tasks/task.entity'; import { TasksModule } from 'modules/tasks/tasks.module'; import { TasksRepository } from 'modules/tasks/tasks.repository'; import { createTask } from '../../entity-mocks'; @@ -157,6 +162,11 @@ describe('Tasks Module (e2e)', () => { .expect(HttpStatus.OK); expect(response.body.data[0].id).toEqual(task1.id); + expect(response.body.data[0].attributes.progress).toEqual('0'); + expect(response.body.data[0].attributes.currentStep).toEqual( + TASK_STEP.UNKNOWN, + ); + expect(response.body.data[0].attributes.status).toEqual('processing'); expect(response.body.data[1].id).toEqual(task2.id); expect(response.body.data[1].attributes.status).toEqual('failed'); diff --git a/api/test/integration/import-data/xlsx-uploads/import-data-produce-jobs.spec.ts b/api/test/integration/import-data/xlsx-uploads/import-data-produce-jobs.spec.ts index 3bf3df6b2..4bbe43e97 100644 --- a/api/test/integration/import-data/xlsx-uploads/import-data-produce-jobs.spec.ts +++ b/api/test/integration/import-data/xlsx-uploads/import-data-produce-jobs.spec.ts @@ -6,7 +6,7 @@ import { getQueueToken } from '@nestjs/bull'; import { importQueueName } from 'modules/import-data/workers/import-queue.name'; import { ImportDataService } from 'modules/import-data/import-data.service'; import { TasksRepository } from 'modules/tasks/tasks.repository'; -import { Task } from 'modules/tasks/task.entity'; +import { Task, TASK_STEP } from 'modules/tasks/task.entity'; // TODO: Update tests below once auth is done describe('XLSX Upload Feature Job Producer Tests', () => { @@ -50,6 +50,7 @@ describe('XLSX Upload Feature Job Producer Tests', () => { }); expect(tasks[0].status).toEqual('processing'); expect(tasks[0].userId).toEqual(userId); + expect(tasks[0].currentStep).toEqual(TASK_STEP.UNKNOWN); }, 100000); test('When loadXlsxFile is called with required file data and a userId, but the Job can not be added to the queue, and the related tasks should be removed', async () => { await bootstrapTestingApp(importQueueFail); diff --git a/api/test/integration/import-data/xlsx-uploads/sourcing-data-import.spec.ts b/api/test/integration/import-data/xlsx-uploads/sourcing-data-import.spec.ts index 04f60eaf2..2111e804b 100644 --- a/api/test/integration/import-data/xlsx-uploads/sourcing-data-import.spec.ts +++ b/api/test/integration/import-data/xlsx-uploads/sourcing-data-import.spec.ts @@ -25,6 +25,7 @@ import { createSourcingLocation, createSourcingRecord, createSupplier, + createTask, } from '../../../entity-mocks'; import { GeoRegion } from 'modules/geo-regions/geo-region.entity'; import { UnknownLocationGeoCodingStrategy } from 'modules/geo-coding/strategies/unknown-location.geocoding.service'; @@ -51,6 +52,8 @@ import { GeoCodingAbstractClass } from 'modules/geo-coding/geo-coding-abstract-c import { Material } from 'modules/materials/material.entity'; import { ScenarioIntervention } from 'modules/scenario-interventions/scenario-intervention.entity'; import { SourcingRecordsWithIndicatorRawDataDto } from 'modules/sourcing-records/dto/sourcing-records-with-indicator-raw-data.dto'; +import { Task, TASK_STEP } from 'modules/tasks/task.entity'; +import { TasksRepository } from 'modules/tasks/tasks.repository'; let tablesToDrop: string[] = []; let missingDataFallbackPolicy: string = 'error'; @@ -133,6 +136,7 @@ describe('Sourcing Data import', () => { } let businessUnitRepository: BusinessUnitRepository; + let taskRepository: TasksRepository; let materialRepository: MaterialRepository; let materialToH3Service: MaterialsToH3sService; let supplierRepository: SupplierRepository; @@ -163,6 +167,7 @@ describe('Sourcing Data import', () => { businessUnitRepository = moduleFixture.get( BusinessUnitRepository, ); + taskRepository = moduleFixture.get(TasksRepository); materialRepository = moduleFixture.get(MaterialRepository); materialToH3Service = moduleFixture.get( @@ -198,6 +203,7 @@ describe('Sourcing Data import', () => { afterEach(async () => { await materialToH3Service.delete({}); + await taskRepository.delete({}); await materialRepository.delete({}); await indicatorRepository.delete({}); await businessUnitRepository.delete({}); @@ -223,10 +229,11 @@ describe('Sourcing Data import', () => { test('When a file is processed by the API and there are no materials in the database, an error should be displayed', async () => { expect.assertions(1); + const task: Task = await createTask(); try { await sourcingDataImportService.importSourcingData( __dirname + '/base-dataset.xlsx', - '', + task.id, ); } catch (err: any) { expect(err.message).toEqual( @@ -239,6 +246,7 @@ describe('Sourcing Data import', () => { const geoRegion: GeoRegion = await createGeoRegion({ isCreatedByUser: false, }); + const task: Task = await createTask(); await createAdminRegion({ isoA2: 'ABC', geoRegion, @@ -254,7 +262,7 @@ describe('Sourcing Data import', () => { await sourcingDataImportService.importSourcingData( __dirname + '/base-dataset.xlsx', - '', + task.id, ); }, 100000); @@ -262,6 +270,7 @@ describe('Sourcing Data import', () => { const geoRegion: GeoRegion = await createGeoRegion({ isCreatedByUser: false, }); + let task: Task = await createTask(); await createAdminRegion({ isoA2: 'ABC', geoRegion, @@ -276,9 +285,13 @@ describe('Sourcing Data import', () => { await sourcingDataImportService.importSourcingData( __dirname + '/base-dataset.xlsx', - '', + task.id, ); + task = await taskRepository.findOneOrFail({ id: task.id }); + expect(task.progress).toEqual('100'); + expect(task.currentStep).toEqual(TASK_STEP.DONE); + const businessUnits: BusinessUnit[] = await businessUnitRepository.find(); expect(businessUnits).toHaveLength(5); const businessUnitsRoots: BusinessUnit[] = @@ -310,6 +323,7 @@ describe('Sourcing Data import', () => { const geoRegion: GeoRegion = await createGeoRegion({ isCreatedByUser: false, }); + const task: Task = await createTask(); await createAdminRegion({ isoA2: 'ABC', geoRegion, @@ -324,7 +338,7 @@ describe('Sourcing Data import', () => { await sourcingDataImportService.importSourcingData( __dirname + '/base-dataset.xlsx', - '', + task.id, ); const sourcingRecords: SourcingRecord[] = @@ -394,6 +408,8 @@ describe('Sourcing Data import', () => { missingDataFallbackPolicy = 'error'; const geoRegion: GeoRegion = await createGeoRegion(); + const task: Task = await createTask(); + await createAdminRegion({ isoA2: 'ABC', geoRegion, @@ -413,7 +429,7 @@ describe('Sourcing Data import', () => { try { await sourcingDataImportService.importSourcingData( __dirname + '/base-dataset-one-material.xlsx', - '', + task.id, ); } catch (err: any) { expect(err.message).toEqual( @@ -426,6 +442,8 @@ describe('Sourcing Data import', () => { missingDataFallbackPolicy = 'ignore'; const geoRegion: GeoRegion = await createGeoRegion(); + const task: Task = await createTask(); + await createAdminRegion({ isoA2: 'ABC', geoRegion, @@ -444,7 +462,7 @@ describe('Sourcing Data import', () => { await sourcingDataImportService.importSourcingData( __dirname + '/base-dataset.xlsx', - '', + task.id, ); const businessUnits: BusinessUnit[] = await businessUnitRepository.find(); @@ -478,6 +496,7 @@ describe('Sourcing Data import', () => { missingDataFallbackPolicy = 'fallback'; const geoRegion: GeoRegion = await createGeoRegion(); + const task: Task = await createTask(); await createAdminRegion({ isoA2: 'ABC', geoRegion, @@ -496,7 +515,7 @@ describe('Sourcing Data import', () => { await sourcingDataImportService.importSourcingData( __dirname + '/base-dataset.xlsx', - '', + task.id, ); const businessUnits: BusinessUnit[] = await businessUnitRepository.find();