Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api-serverless-cms): request cloning #4422

Draft
wants to merge 8 commits into
base: next
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/api-log/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"@webiny/plugins": "0.0.0",
"@webiny/tasks": "0.0.0",
"@webiny/utils": "0.0.0",
"zod": "^3.22.4"
"zod": "^3.23.8"
},
"devDependencies": {
"@webiny/cli": "0.0.0",
Expand Down
8 changes: 6 additions & 2 deletions packages/api-serverless-cms/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
"@webiny/api-prerendering-service": "0.0.0",
"@webiny/api-security": "0.0.0",
"@webiny/api-tenancy": "0.0.0",
"@webiny/aws-sdk": "0.0.0",
"@webiny/error": "0.0.0",
"@webiny/handler": "0.0.0",
"@webiny/handler-client": "0.0.0",
"@webiny/handler-graphql": "0.0.0"
"@webiny/handler-graphql": "0.0.0",
"@webiny/utils": "0.0.0",
"date-fns": "^2.22.1",
"zod": "^3.23.8"
},
"devDependencies": {
"@webiny/api-admin-users": "0.0.0",
Expand All @@ -38,7 +43,6 @@
"@webiny/api-wcp": "0.0.0",
"@webiny/api-websockets": "0.0.0",
"@webiny/cli": "0.0.0",
"@webiny/handler": "0.0.0",
"@webiny/handler-aws": "0.0.0",
"@webiny/plugins": "0.0.0",
"@webiny/project-utils": "0.0.0",
Expand Down
1 change: 1 addition & 0 deletions packages/api-serverless-cms/src/enterprise/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./requestCloning";
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./requestCloning";
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { ServiceDiscovery } from "@webiny/api";
import zod from "zod";

export interface IOptions {
sqsRegion: string;
sqsUrl: string;
s3Region: string;
s3Bucket: string;
}

interface IManifestTwoPhasedDeployment {
isPrimary: boolean;
s3Region: string;
s3Bucket: string;
sqsRegion: string;
sqsUrl: string;
}

interface IManifest {
twoPhasedDeployment: IManifestTwoPhasedDeployment;
}

const optionsValidation = zod.object({
twoPhasedDeployment: zod.object({
isPrimary: zod.boolean(),
s3Region: zod.string(),
s3Bucket: zod.string(),
sqsRegion: zod.string(),
sqsUrl: zod.string()
})
});

export const getOptions = async (): Promise<IOptions | null> => {
const manifest = await ServiceDiscovery.load<IManifest>();
if (!manifest) {
console.error("Service manifest not found.");
return null;
}

const validated = await optionsValidation.safeParseAsync(manifest);
if (!validated.success || !validated.data) {
console.error("Service manifest is not valid.");
console.log(validated.error);
return null;
}

const { twoPhasedDeployment } = validated.data;

if (!twoPhasedDeployment?.isPrimary) {
return null;
}
return twoPhasedDeployment;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { compress } from "@webiny/utils/compression/gzip";
import type { ISQSTransfer } from "../sqs/types";
import type { IS3Transfer, IS3TransferSendResult } from "../s3/types";
import type {
IRequestTransfer,
IRequestTransferSendParams,
IRequestTransferSendResult
} from "./types";
import { format as formatDate } from "date-fns";

/**
* There are few steps we must go through:
* 1. compress the request body
* 2. store the compressed request body in S3
* 3. send a message to SQS with the location of the compressed request body in S3
*/

export interface ITransferRequestParams {
s3: IS3Transfer;
sqs: ISQSTransfer;
}

export class RequestTransfer implements IRequestTransfer {
private readonly s3: IS3Transfer;
private readonly sqs: ISQSTransfer;

public constructor(params: ITransferRequestParams) {
this.s3 = params.s3;
this.sqs = params.sqs;
}

public async send(params: IRequestTransferSendParams): Promise<IRequestTransferSendResult> {
const { event } = params;
const body = await compress(JSON.stringify(event));

const key = this.createRequestTransferKey(params);

let result: IS3TransferSendResult;
try {
result = await this.s3.send({
key,
body
});
} catch (ex) {
console.error("Failed to store the request in S3.");
console.log(ex);
return;
}

try {
await this.sqs.send({
body: JSON.stringify({
type: result.type,
value: result.key
}),
attributes: [
{
name: "ETag",
value: result.eTag
}
],
groupId: "systemAToSystemB"
});
} catch (ex) {
console.error("Failed to send a message to SQS.");
console.log(ex);
}
}

private createRequestTransferKey(params: Pick<IRequestTransferSendParams, "key">): string {
const formattedDate = formatDate(new Date(), "yyyy/MM/dd");
const formattedTime = formatDate(new Date(), "HH:mm:ss");
return `requests/${formattedDate}/${formattedTime}/${params.key}.json.gz`;
}
}

export const createRequestTransfer = (params: ITransferRequestParams): IRequestTransfer => {
return new RequestTransfer(params);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./RequestTransfer";
export * from "./types";
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { APIGatewayEvent } from "@webiny/handler-aws/types";

export interface IRequestTransferSendParams {
key: string;
event: APIGatewayEvent;
}

export type IRequestTransferSendResult = void;

export interface IRequestTransfer {
send(params: IRequestTransferSendParams): Promise<IRequestTransferSendResult>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { createModifyFastifyPlugin } from "@webiny/handler";
import { createSqsClient } from "@webiny/aws-sdk/client-sqs";
import { createS3Client } from "@webiny/aws-sdk/client-s3";
import { getOptions } from "./options";
import { createCacheKey } from "@webiny/utils";
import { createS3Transfer } from "./s3";
import { createSQSTransfer } from "./sqs";
import { createRequestTransfer } from "./request";

export const requestCloning = () => {
return createModifyFastifyPlugin(instance => {
instance.addHook("onRequest", async request => {
// @ts-expect-error
request.cloning = (async () => {
/**
* We will not transfer OPTIONS request. Everything else can go into another system.
*/
if (request.method.toLowerCase() === "options") {
return;
}
const event = request.awsLambda?.event;
if (!event) {
console.error(`There is no event to be transferred into another system.`);
return;
}
const lambdaContext = request.awsLambda?.context;
if (!lambdaContext) {
console.error(`There is no context to be transferred into another system.`);
return;
}
const options = await getOptions();
if (!options) {
/**
* If no options, either there is some error in the options validation or this system is not primary.
* In both cases, we will skip the transfer.
*/
return;
}

const s3Transfer = createS3Transfer({
client: createS3Client({
region: options.s3Region
}),
bucket: options.s3Bucket
});
const sqsTransfer = createSQSTransfer({
client: createSqsClient({
region: options.sqsRegion
}),
url: options.sqsUrl
});

const requestTransfer = createRequestTransfer({
s3: s3Transfer,
sqs: sqsTransfer
});

const key = createCacheKey(event, {
algorithm: "sha512"
});

try {
await requestTransfer.send({
key,
event
});
} catch (ex) {
console.error("Failed to transfer the request to another system.");
console.log(ex);
throw ex;
}
})();
});
});
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { IS3Transfer, IS3TransferSendParams, IS3TransferSendResult } from "./types";
import {
PutObjectCommand,
PutObjectCommandInput,
PutObjectCommandOutput,
S3Client
} from "@webiny/aws-sdk/client-s3";
import { TRANSFER_TYPE_S3 } from "./constants";

export interface IS3TransferParams {
client: S3Client;
bucket: string;
}

export class S3Transfer implements IS3Transfer {
private readonly client: S3Client;
private readonly bucket: string;

public constructor(params: IS3TransferParams) {
this.client = params.client;
this.bucket = params.bucket;
}

public async send(params: IS3TransferSendParams): Promise<IS3TransferSendResult> {
let s3Result: PutObjectCommandOutput;
try {
const input: PutObjectCommandInput = {
ACL: "private",
Bucket: this.bucket,
Key: params.key,
Body: params.body
};
const cmd = new PutObjectCommand(input);
s3Result = await this.client.send(cmd);
} catch (ex) {
console.error("Failed to store the request in S3.");
console.log(ex);
throw ex;
}

const statusCode = s3Result.$metadata?.httpStatusCode;
const eTag = s3Result.ETag;

if (statusCode !== 200 || !eTag) {
const message = `Failed to store the request in S3. Key: ${params.key}`;
console.error(message);
throw new Error(message);
}

return {
type: TRANSFER_TYPE_S3,
key: params.key,
eTag,
statusCode
};
}
}

export const createS3Transfer = (params: IS3TransferParams): IS3Transfer => {
return new S3Transfer(params);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const TRANSFER_TYPE_S3 = "s3";
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./S3Transfer";
export * from "./types";
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export interface IS3TransferSendParams {
key: string;
body: string | Buffer;
}
export interface IS3TransferSendResult {
type: string;
key: string;
eTag: string;
statusCode: 200 | unknown;
}
export interface IS3Transfer {
send(params: IS3TransferSendParams): Promise<IS3TransferSendResult>;
}
Loading
Loading