Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TM-1531] delayed job with data #25

Merged
merged 19 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 166 additions & 20 deletions apps/job-service/src/jobs/delayed-jobs.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { DelayedJobsController } from './delayed-jobs.controller';
import { Test, TestingModule } from '@nestjs/testing';
import { NotFoundException } from '@nestjs/common';
import { DelayedJobFactory } from '@terramatch-microservices/database/factories';
import { Resource } from '@terramatch-microservices/common/util';
import { DelayedJobsController } from './delayed-jobs.controller';
import { DelayedJob } from '@terramatch-microservices/database/entities';
import { DelayedJobBulkUpdateBodyDto } from './dto/delayed-job-update.dto';
import { v4 as uuidv4 } from 'uuid';
import { UnauthorizedException, NotFoundException, BadRequestException } from '@nestjs/common';

describe('DelayedJobsController', () => {
let controller: DelayedJobsController;

beforeEach(async () => {
await DelayedJob.destroy({
where: {},
truncate: true
});

const module: TestingModule = await Test.createTestingModule({
controllers: [DelayedJobsController]
}).compile();
Expand All @@ -17,23 +23,163 @@ describe('DelayedJobsController', () => {

afterEach(() => {
jest.restoreAllMocks();
})
});

describe('getRunningJobs', () => {
it('should return a list of running jobs for the authenticated user', async () => {
const authenticatedUserId = 130999;

const job = await DelayedJob.create({
uuid: uuidv4(),
createdBy: authenticatedUserId,
isAcknowledged: false,
status: 'completed',
});
const request = {
authenticatedUserId,
};

const result = await controller.getRunningJobs(request);

const data = Array.isArray(result.data) ? result.data : [result.data];

expect(data).toHaveLength(1);
expect(data[0].id).toBe(job.uuid);
});
it('should return an empty list when there are no running jobs', async () => {
const authenticatedUserId = 130999;
const request = { authenticatedUserId };

const result = await controller.getRunningJobs(request);
expect(result.data).toHaveLength(0);
});
});

it('should throw not found if the delayed job does not exist', async () => {
await expect(controller.findOne('asdf')).rejects
.toThrow(NotFoundException);
describe('findOne', () => {
it('should return a job by UUID', async () => {
const authenticatedUserId = 130999;
const job = await DelayedJob.create({
uuid: uuidv4(),
createdBy: authenticatedUserId,
isAcknowledged: false,
status: 'completed'
});

const result = await controller.findOne(job.uuid);
const jobData = Array.isArray(result.data) ? result.data[0] : result.data;
expect(jobData.id).toBe(job.uuid);
});
it('should throw NotFoundException when the job does not exist', async () => {
const nonExistentUuid = uuidv4();

await expect(controller.findOne(nonExistentUuid)).rejects.toThrow(NotFoundException);
});

});

it('should return the job definition when the delayed job does exist', async () => {
const { uuid, statusCode, payload, total_content, processed_content, proccess_message } = await DelayedJobFactory.create();
const result = await controller.findOne(uuid);
const resource = result.data as Resource;
expect(resource.type).toBe('delayedJobs');
expect(resource.id).toBe(uuid);
expect(resource.attributes.statusCode).toBe(statusCode);
expect(resource.attributes.payload).toMatchObject(payload);
expect(resource.attributes.total_content).toBe(total_content);
expect(resource.attributes.processed_content).toBe(processed_content);
expect(resource.attributes.proccess_message).toBe(proccess_message);
describe('bulkClearJobs', () => {
it('should successfully bulk update jobs to acknowledged', async () => {
const authenticatedUserId = 130999;
const job1 = await DelayedJob.create({
uuid: uuidv4(),
createdBy: authenticatedUserId,
isAcknowledged: false,
status: 'completed'
});
const job2 = await DelayedJob.create({
uuid: uuidv4(),
createdBy: authenticatedUserId,
isAcknowledged: false,
status: 'failed'
});

const payload: DelayedJobBulkUpdateBodyDto = {
data: [
{
type: 'delayedJobs',
uuid: job1.uuid,
attributes: { isAcknowledged: true }
},
{
type: 'delayedJobs',
uuid: job2.uuid,
attributes: { isAcknowledged: true }
}
]
};

const request = { authenticatedUserId };

const result = await controller.bulkClearJobs(payload, request);

expect(result.data).toHaveLength(2);
expect(result.data[0].id).toBe(job1.uuid);
expect(result.data[1].id).toBe(job2.uuid);

const updatedJob1 = await DelayedJob.findOne({ where: { uuid: job1.uuid } });
const updatedJob2 = await DelayedJob.findOne({ where: { uuid: job2.uuid } });
expect(updatedJob1.isAcknowledged).toBe(true);
expect(updatedJob2.isAcknowledged).toBe(true);
});

it('should throw BadRequestException when no jobs are provided', async () => {
const payload: DelayedJobBulkUpdateBodyDto = { data: [] };
const request = { authenticatedUserId: 130999 };

await expect(controller.bulkClearJobs(payload, request))
.rejects.toThrow(BadRequestException);
});

it('should throw NotFoundException for non-existent job', async () => {
const payload: DelayedJobBulkUpdateBodyDto = {
data: [
{
type: 'delayedJobs',
uuid: 'non-existent-uuid',
attributes: { isAcknowledged: true }
}
]
};
const request = { authenticatedUserId: 130999 };

await expect(controller.bulkClearJobs(payload, request))
.rejects.toThrow(NotFoundException);
});

it('should throw UnauthorizedException if no authenticated user id', async () => {
const payload: DelayedJobBulkUpdateBodyDto = {
data: [
{ type: 'delayedJobs', uuid: uuidv4(), attributes: { isAcknowledged: true } }
]
};

await expect(controller.bulkClearJobs(payload, { authenticatedUserId: null }))
.rejects.toThrow(UnauthorizedException);
});

it('should not update jobs with status "pending"', async () => {
const authenticatedUserId = 130999;
const pendingJob = await DelayedJob.create({
uuid: uuidv4(),
createdBy: authenticatedUserId,
isAcknowledged: false,
status: 'pending'
});

const payload: DelayedJobBulkUpdateBodyDto = {
data: [
{
type: 'delayedJobs',
uuid: pendingJob.uuid,
attributes: { isAcknowledged: true }
}
]
};
const request = { authenticatedUserId };

await expect(controller.bulkClearJobs(payload, request))
.rejects.toThrow(NotFoundException);
});

});
})
});
132 changes: 122 additions & 10 deletions apps/job-service/src/jobs/delayed-jobs.controller.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,150 @@
import { Controller, Get, NotFoundException, Param, UnauthorizedException } from '@nestjs/common';
import { Controller, Get, NotFoundException, Param, UnauthorizedException, Request, Patch, BadRequestException, Body, Logger } from '@nestjs/common';
import { ApiException } from '@nanogiants/nestjs-swagger-api-exception-decorator';
import { ApiOperation } from '@nestjs/swagger';
import { ApiBody, ApiOperation } from '@nestjs/swagger';
import { Op } from 'sequelize';
import { JsonApiResponse } from '@terramatch-microservices/common/decorators';
import {
buildJsonApi,
JsonApiDocument,
} from '@terramatch-microservices/common/util';
import { DelayedJobDto } from './dto/delayed-job.dto';
import { DelayedJob } from '@terramatch-microservices/database/entities';
import { DelayedJobBulkUpdateBodyDto } from './dto/delayed-job-update.dto';

@Controller('jobs/v3/delayedJobs')
export class DelayedJobsController {
@Get()
@ApiOperation({
operationId: 'listDelayedJobs',
description: 'Retrieve a list of all delayed jobs.',
})
@JsonApiResponse({ data: { type: DelayedJobDto } })
@ApiException(() => UnauthorizedException, {
description: 'Authentication failed.',
})
async getRunningJobs(
@Request() { authenticatedUserId }
): Promise<JsonApiDocument> {
const runningJobs = await DelayedJob.findAll({
where: {
isAcknowledged: false,
createdBy: authenticatedUserId
},
order: [['createdAt', 'DESC']],
});

const document = buildJsonApi();
runningJobs.forEach((job) => {
document.addData(job.uuid, new DelayedJobDto(job));
});
return document.serialize();
}

@Get(':uuid')
@ApiOperation({
operationId: 'delayedJobsFind',
description: 'Get the current status and potentially payload or error from a delayed job.',
description:
'Get the current status and potentially payload or error from a delayed job.',
})
@JsonApiResponse({ data: { type: DelayedJobDto } })
@ApiException(() => UnauthorizedException, {
description: 'Authentication failed.',
})
@ApiException(() => NotFoundException, {
description: 'Job with that UUID not found.'
description: 'Job with that UUID not found.',
})
// Note: Since jobs are very generic and we don't track which resources are related to a given
// job, there is no effective way to make a policy for jobs until we expand the service to
// include an owner ID on the job table.
async findOne(@Param('uuid') pathUUID: string): Promise<JsonApiDocument> {
const job = await DelayedJob.findOne({ where: { uuid: pathUUID }});
const job = await DelayedJob.findOne({ where: { uuid: pathUUID } });
if (job == null) throw new NotFoundException();

// Note: Since jobs are very generic and we don't track which resources are related to a given
// job, there is no effective way to make a policy for jobs until we expand the service to
// include an owner ID on the job table.

roguenet marked this conversation as resolved.
Show resolved Hide resolved
return buildJsonApi()
.addData(pathUUID, new DelayedJobDto(job))
.document.serialize();
}
}

@Patch('bulk-clear')
@ApiOperation({
operationId: 'bulkClearJobs',
summary: 'Bulk update jobs to modify isAcknowledged for specified job IDs',
roguenet marked this conversation as resolved.
Show resolved Hide resolved
description: `Accepts a JSON:API-compliant payload to bulk update jobs, allowing each job's isAcknowledged attribute to be set to true or false.`,
})
@ApiBody({
description: 'JSON:API bulk update payload for jobs',
roguenet marked this conversation as resolved.
Show resolved Hide resolved
type: DelayedJobBulkUpdateBodyDto,
examples: {
example: {
value: {
data: [
{
type: 'jobs',
uuid: 'uuid-1',
attributes: {
isAcknowledged: true,
},
},
{
type: 'jobs',
uuid: 'uuid-2',
attributes: {
isAcknowledged: false,
},
},
],
},
},
},
})
@JsonApiResponse({ data: { type: DelayedJobDto } })
@ApiException(() => UnauthorizedException, { description: 'Authentication failed.' })
@ApiException(() => BadRequestException, { description: 'Invalid payload or IDs provided.' })
@ApiException(() => NotFoundException, { description: 'One or more jobs specified in the payload could not be found.' })
async bulkClearJobs(
@Body() bulkClearJobsDto: DelayedJobBulkUpdateBodyDto,
@Request() { authenticatedUserId }
): Promise<JsonApiDocument> {
const jobUpdates = bulkClearJobsDto.data;

if (!jobUpdates || jobUpdates.length === 0) {
throw new BadRequestException('No jobs provided in the payload.');
}

roguenet marked this conversation as resolved.
Show resolved Hide resolved
if (!authenticatedUserId) {
throw new UnauthorizedException('Authentication failed.');
}
const updatePromises = jobUpdates.map(async (job) => {
roguenet marked this conversation as resolved.
Show resolved Hide resolved
const [updatedCount] = await DelayedJob.update(
{ isAcknowledged: job.attributes.isAcknowledged },
{
where: {
uuid: job.uuid,
createdBy: authenticatedUserId,
status: { [Op.ne]: 'pending' },
},
});

if (updatedCount === 0) {
throw new NotFoundException(`Job with UUID ${job.uuid} could not be updated.`);
}

const updatedJob = await DelayedJob.findOne({
where: { uuid: job.uuid },
});

return updatedJob;
});

roguenet marked this conversation as resolved.
Show resolved Hide resolved
const updatedJobs = await Promise.all(updatePromises);


const jsonApiBuilder = buildJsonApi();
updatedJobs.forEach((job) => {
jsonApiBuilder.addData(job.uuid, new DelayedJobDto(job));
});

return jsonApiBuilder.serialize();

}
roguenet marked this conversation as resolved.
Show resolved Hide resolved
}
31 changes: 31 additions & 0 deletions apps/job-service/src/jobs/dto/delayed-job-update.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { ApiProperty } from '@nestjs/swagger';
import { IsArray, IsBoolean, IsUUID, ValidateNested } from 'class-validator';
import { Type } from 'class-transformer';

class DelayedJobAttributes {
@IsBoolean()
@ApiProperty({ description: 'Value to set for isAcknowledged', example: true })
isAcknowledged: boolean;
}

export class DelayedJobData {
@ApiProperty({ enum: ['delayedJobs'], description: 'Type of the resource', example: 'delayedJobs' })
type: 'delayedJobs';

@IsUUID()
@ApiProperty({ format: 'uuid', description: 'UUID of the job', example: 'uuid-1' })
uuid: string;

@ValidateNested()
@Type(() => DelayedJobAttributes)
@ApiProperty({ description: 'Attributes to update for the job', type: DelayedJobAttributes })
attributes: DelayedJobAttributes;
}

export class DelayedJobBulkUpdateBodyDto {
@IsArray()
@ValidateNested({ each: true })
@Type(() => DelayedJobData)
@ApiProperty({ description: 'List of jobs to update isAcknowledged', type: [DelayedJobData] })
data: DelayedJobData[];
}
Loading
Loading