Skip to content

Commit

Permalink
feat(release): update b1a357c8
Browse files Browse the repository at this point in the history
  • Loading branch information
fedellen committed Aug 30, 2024
1 parent fd1032e commit 709d619
Show file tree
Hide file tree
Showing 54 changed files with 2,310 additions and 443 deletions.
3 changes: 2 additions & 1 deletion .mocharc.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module.exports = {
extension: ["ts"],
require: ["ts-node/register/transpile-only", "tests/testSetup.ts"],
timeout: "20000", // 20 seconds
parallel: true,
parallel: false,
exit: true,
recursive: true,
};
8 changes: 8 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
"bignumber",
"blocklisted",
"bundlr",
"cosmjs",
"Cosmoshub",
"dataitem",
"ethersproject",
"indep",
Expand All @@ -24,7 +26,10 @@
"INJECTEDAPTOS",
"Irys",
"knexfile",
"kyve",
"kyvejs",
"localstack",
"MAXVALUE",
"MULTIAPTOS",
"multistream",
"NOWAIT",
Expand All @@ -36,12 +41,15 @@
"otel",
"otlp",
"outdir",
"privkey",
"Readables",
"schemaname",
"Secp",
"solana",
"sslmode",
"svcs",
"tablename",
"tendermint",
"terragrunt",
"throughputs",
"trivago",
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ services:
- upload-service-pg

upload-service-pg:
image: postgres:13.8
image: postgres:16.1
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
Expand Down
57 changes: 34 additions & 23 deletions ecs/fulfillment-pipeline/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { Message } from "@aws-sdk/client-sqs";
import { Message, SQSClient } from "@aws-sdk/client-sqs";
import Koa from "koa";
import Router from "koa-router";
import * as promClient from "prom-client";
import { Consumer } from "sqs-consumer";
import { Logger } from "winston";

import { ArweaveGateway } from "../../../src/arch/arweaveGateway";
import { PostgresDatabase } from "../../../src/arch/db/postgres";
import { TurboPaymentService } from "../../../src/arch/payment";
import { migrateOnStartup } from "../../../src/constants";
import { jobLabels, migrateOnStartup } from "../../../src/constants";
import { postBundleHandler } from "../../../src/jobs/post";
import { prepareBundleHandler } from "../../../src/jobs/prepare";
import { seedBundleHandler } from "../../../src/jobs/seed";
Expand All @@ -39,7 +40,10 @@ import { createUnbundleBDIQueueConsumer } from "./jobs/unbundleBdi";
import { VerifyBundleJobScheduler } from "./jobs/verify";
import { JobScheduler } from "./utils/jobScheduler";
import { createPlanIdHandlingSQSConsumer } from "./utils/planIdMessageHandler";
import { QueueHandlerConfig } from "./utils/queueHandlerConfig";
import {
QueueHandlerConfig,
defaultSQSOptions,
} from "./utils/queueHandlerConfig";

// let otelExporter: OTELExporter | undefined; // eslint-disable-line

Expand Down Expand Up @@ -81,11 +85,11 @@ const paymentService = new TurboPaymentService();
const arweaveGateway = new ArweaveGateway();

// Set up queue handler configurations for jobs based on a planId
export const queues: QueueHandlerConfig[] = [
const planIdQueueHandlerConfigs: QueueHandlerConfig[] = [
{
queueUrl: prepareBundleQueueUrl,
jobName: jobLabels.prepareBundle,
handler: prepareBundleHandler,
logger: globalLogger.child({ queue: "prepare-bundle" }),
consumerOptions: {
pollingWaitTimeMs: 1000,
visibilityTimeout: 360,
Expand All @@ -94,17 +98,17 @@ export const queues: QueueHandlerConfig[] = [
},
{
queueUrl: postBundleQueueUrl,
jobName: jobLabels.postBundle,
handler: postBundleHandler,
logger: globalLogger.child({ queue: "post-bundle" }),
consumerOptions: {
pollingWaitTimeMs: 1000,
visibilityTimeout: 90,
},
},
{
queueUrl: seedBundleQueueUrl,
jobName: jobLabels.seedBundle,
handler: seedBundleHandler,
logger: globalLogger.child({ queue: "seed-bundle" }),
consumerOptions: {
pollingWaitTimeMs: 10,
visibilityTimeout: 360,
Expand All @@ -113,18 +117,22 @@ export const queues: QueueHandlerConfig[] = [
},
];

type ConsumerQueue = QueueHandlerConfig & { consumer: Consumer };

const consumers: ConsumerQueue[] = queues.map((queue) => ({
consumer: createPlanIdHandlingSQSConsumer({
queue,
database: uploadDatabase,
objectStore,
paymentService,
arweaveGateway,
}),
...queue,
}));
type ConsumerQueue = { consumer: Consumer; logger: Logger };

const consumers: ConsumerQueue[] = planIdQueueHandlerConfigs.map((queue) => {
const logger = globalLogger.child({ queue: queue.jobName });
return {
logger,
consumer: createPlanIdHandlingSQSConsumer({
logger,
queue,
database: uploadDatabase,
objectStore,
paymentService,
arweaveGateway,
}),
};
});

let shouldExit = false;
let numInflightMessages = 0;
Expand Down Expand Up @@ -237,6 +245,8 @@ type ConsumerProvisioningConfig = {
friendlyQueueName: string;
};

const sqsClient = new SQSClient(defaultSQSOptions);

const consumerQueues: ConsumerProvisioningConfig[] = [
{
envVarCountStr: process.env.NUM_FINALIZE_UPLOAD_CONSUMERS,
Expand All @@ -248,13 +258,13 @@ const consumerQueues: ConsumerProvisioningConfig[] = [
objectStore,
paymentService,
}),
friendlyQueueName: "finalize-upload",
friendlyQueueName: jobLabels.finalizeUpload,
},
{
envVarCountStr: process.env.NUM_OPTICAL_CONSUMERS,
defaultCount: 3,
createConsumerQueueFn: () => createOpticalConsumerQueue(globalLogger),
friendlyQueueName: "optical",
friendlyQueueName: jobLabels.opticalPost,
},
{
envVarCountStr: process.env.NUM_NEW_DATA_ITEM_INSERT_CONSUMERS,
Expand All @@ -263,14 +273,15 @@ const consumerQueues: ConsumerProvisioningConfig[] = [
createNewDataItemBatchInsertQueue({
database: uploadDatabase,
logger: globalLogger,
sqsClient,
}),
friendlyQueueName: "new-data-item",
friendlyQueueName: jobLabels.newDataItem,
},
{
envVarCountStr: process.env.NUM_UNBUNDLE_BDI_CONSUMERS,
defaultCount: 1,
createConsumerQueueFn: () => createUnbundleBDIQueueConsumer(globalLogger),
friendlyQueueName: "unbundle-bdi",
friendlyQueueName: jobLabels.unbundleBdi,
},
];

Expand Down
42 changes: 22 additions & 20 deletions ecs/fulfillment-pipeline/src/jobs/finalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ import { Database } from "../../../../src/arch/db/database";
import { ObjectStore } from "../../../../src/arch/objectStore";
import { PaymentService } from "../../../../src/arch/payment";
import { getQueueUrl } from "../../../../src/arch/queues";
import { gatewayUrl } from "../../../../src/constants";
import { gatewayUrl, jobLabels } from "../../../../src/constants";
import { finalizeMultipartUploadWithQueueMessage } from "../../../../src/routes/multiPartUploads";
import {
DataItemExistsWarning,
InsufficientBalance,
} from "../../../../src/utils/errors";
import { getArweaveWallet } from "../../../../src/utils/getArweaveWallet";
import {
defaultSQSOptions,
stubQueueHandler,
} from "../utils/queueHandlerConfig";
import { fulfillmentJobHandler } from "../utils/jobHandler";
import { defaultSQSOptions } from "../utils/queueHandlerConfig";

export function createFinalizeUploadConsumerQueue({
logger,
Expand All @@ -46,8 +44,10 @@ export function createFinalizeUploadConsumerQueue({
objectStore: ObjectStore;
paymentService: PaymentService;
}) {
const finalizeUploadQueueUrl = getQueueUrl("finalize-upload");
const finalizeUploadLogger = logger.child({ queue: "finalize-upload" });
const finalizeUploadQueueUrl = getQueueUrl(jobLabels.finalizeUpload);
const finalizeUploadLogger = logger.child({
queue: jobLabels.finalizeUpload,
});
return {
consumer: Consumer.create({
queueUrl: finalizeUploadQueueUrl,
Expand All @@ -59,17 +59,21 @@ export function createFinalizeUploadConsumerQueue({
}
);
try {
await finalizeMultipartUploadWithQueueMessage({
message,
logger: finalizeUploadLogger,
objectStore,
paymentService,
database,
getArweaveWallet,
arweaveGateway: new ArweaveGateway({
endpoint: gatewayUrl,
}),
});
await fulfillmentJobHandler(
() =>
finalizeMultipartUploadWithQueueMessage({
message,
logger: finalizeUploadLogger,
objectStore,
paymentService,
database,
getArweaveWallet,
arweaveGateway: new ArweaveGateway({
endpoint: gatewayUrl,
}),
}),
jobLabels.finalizeUpload
);
} catch (error) {
if (error instanceof DataItemExistsWarning) {
finalizeUploadLogger.warn("Data item already exists", {
Expand All @@ -91,8 +95,6 @@ export function createFinalizeUploadConsumerQueue({
visibilityTimeout: 30,
pollingWaitTimeMs: 500,
}),
queueUrl: finalizeUploadQueueUrl, // unused
handler: stubQueueHandler, // unused
logger: finalizeUploadLogger,
};
}
Loading

0 comments on commit 709d619

Please sign in to comment.