Skip to content

Commit

Permalink
feat(release): update 994ac64
Browse files Browse the repository at this point in the history
  • Loading branch information
fedellen committed Jun 26, 2024
1 parent 6299f54 commit 7f005fc
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 347 deletions.
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
DB_HOST: upload-service-pg
DB_PORT: 5432
DB_PASSWORD: postgres
PAYMENT_SERVICE_BASE_URL: ${PAYMENT_SERVICE_BASE_URL:-payment.ardrive.dev}
PAYMENT_SERVICE_BASE_URL: ${PAYMENT_SERVICE_BASE_URL:-}
MAX_DATA_ITEM_SIZE: ${MAX_DATA_ITEM_SIZE:-10737418240}
ALLOW_LISTED_ADDRESSES: ${ALLOW_LISTED_ADDRESSES:-}
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
Expand Down Expand Up @@ -68,6 +68,8 @@ services:
S3_FORCE_PATH_STYLE: ${S3_FORCE_PATH_STYLE:-}
AWS_REGION: ${AWS_REGION:-us-east-1}
OVERDUE_DATA_ITEM_THRESHOLD_MS: ${OVERDUE_DATA_ITEM_THRESHOLD_MS:-0} # plan data items immediately into bundles when plan-bundle runs
ports:
- "${FULFILLMENT_PORT:-4000}:${FULFILLMENT_PORT:-4000}"

depends_on:
localstack:
Expand Down
35 changes: 34 additions & 1 deletion ecs/fulfillment-pipeline/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { Message } from "@aws-sdk/client-sqs";
import Koa from "koa";
import Router from "koa-router";
import * as promClient from "prom-client";
import { Consumer } from "sqs-consumer";

import { ArweaveGateway } from "../../../src/arch/arweaveGateway";
Expand All @@ -25,6 +28,7 @@ import { postBundleHandler } from "../../../src/jobs/post";
import { prepareBundleHandler } from "../../../src/jobs/prepare";
import { seedBundleHandler } from "../../../src/jobs/seed";
import globalLogger from "../../../src/logger";
import { MetricRegistry } from "../../../src/metricRegistry";
import { loadConfig } from "../../../src/utils/config";
import { getS3ObjectStore } from "../../../src/utils/objectStoreUtils";
import { createFinalizeUploadConsumerQueue } from "./jobs/finalize";
Expand Down Expand Up @@ -236,7 +240,7 @@ type ConsumerProvisioningConfig = {
const consumerQueues: ConsumerProvisioningConfig[] = [
{
envVarCountStr: process.env.NUM_FINALIZE_UPLOAD_CONSUMERS,
defaultCount: 10,
defaultCount: 2,
createConsumerQueueFn: () =>
createFinalizeUploadConsumerQueue({
logger: globalLogger,
Expand Down Expand Up @@ -321,3 +325,32 @@ if (process.env.VERIFY_BUNDLE_ENABLED === "true") {
});
setUpAndStartJobScheduler(verifyBundleJobScheduler);
}

const app = new Koa();
const router = new Router();

const metricsRegistry = MetricRegistry.getInstance().getRegistry();
promClient.collectDefaultMetrics({ register: metricsRegistry });

// Prometheus
router.get(["/fulfillment_metrics", "metrics"], async (ctx, next) => {
ctx.body = await metricsRegistry.metrics();
return next();
});

// Health check
router.get(["/health", "/"], (ctx, next) => {
ctx.body = "OK";
return next();
});
const port = +(process.env.FULFILLMENT_PORT ?? process.env.PORT ?? 4000);
app.use(router.routes());
const server = app.listen(port);

globalLogger.info(
`Fulfillment pipeline service started with node environment ${process.env.NODE_ENV} on port ${port}...`
);

server.on("error", (error) => {
globalLogger.error("Server error", error);
});
5 changes: 3 additions & 2 deletions ecs/fulfillment-pipeline/src/jobs/finalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ export function createFinalizeUploadConsumerQueue({
} catch (error) {
if (error instanceof DataItemExistsWarning) {
finalizeUploadLogger.warn("Data item already exists", {
error,
message,
error: error.message,
messageId: message.MessageId,
messageBody: message.Body,
});
return;
} else if (error instanceof InsufficientBalance) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"typescript": "^4.7.4"
},
"dependencies": {
"@ardrive/ardrive-promise-cache": "^1.1.4",
"@ardrive/ardrive-promise-cache": "^1.3.0",
"@aws-sdk/abort-controller": "^3.303.0",
"@aws-sdk/client-s3": "^3.367.0",
"@aws-sdk/client-secrets-manager": "^3.363.0",
Expand Down
6 changes: 3 additions & 3 deletions src/arch/db/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export interface Database {
}: {
uploadId: UploadId;
uploadKey: string;
}): Promise<void>;
}): Promise<InFlightMultiPartUpload>;
finalizeMultiPartUpload(params: {
uploadId: UploadId;
etag: string;
Expand Down Expand Up @@ -180,8 +180,8 @@ export interface Database {
): Promise<FinishedMultiPartUpload>;
updateMultipartChunkSize(
chunkSize: number,
uploadId: UploadId
): Promise<void>;
upload: InFlightMultiPartUpload
): Promise<number>;

updatePlannedDataItemAsFailed(params: {
dataItemId: DataItemId;
Expand Down
121 changes: 78 additions & 43 deletions src/arch/db/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import {
FinishedMultiPartUploadDBInsert,
FinishedMultiPartUploadDBResult,
InFlightMultiPartUpload,
InFlightMultiPartUploadDBInsert,
InFlightMultiPartUploadDBResult,
InFlightMultiPartUploadParams,
InsertNewBundleParams,
Expand Down Expand Up @@ -968,18 +967,27 @@ export class PostgresDatabase implements Database {
public async insertInFlightMultiPartUpload({
uploadId,
uploadKey,
}: InFlightMultiPartUploadParams): Promise<void> {
}: InFlightMultiPartUploadParams): Promise<InFlightMultiPartUpload> {
this.log.debug("Inserting in flight multipart upload...", {
uploadId,
uploadKey,
});

return this.writer.transaction(async (knexTransaction) => {
await knexTransaction(tableNames.inFlightMultiPartUpload).insert({
upload_id: uploadId,
upload_key: uploadKey,
});
const result = await this.writer.transaction(async (knexTransaction) => {
const [insertedRow] =
await knexTransaction<InFlightMultiPartUploadDBResult>(
tableNames.inFlightMultiPartUpload
)
.insert({
upload_id: uploadId,
upload_key: uploadKey,
})
.returning("*"); // Returning the inserted row

return insertedRow;
});

return entityToInFlightMultiPartUpload(result);
}

public async finalizeMultiPartUpload({
Expand Down Expand Up @@ -1042,18 +1050,7 @@ export class PostgresDatabase implements Database {
throw new MultiPartUploadNotFound(uploadId);
}

return {
uploadId: inFlightUpload.upload_id,
uploadKey: inFlightUpload.upload_key,
createdAt: inFlightUpload.created_at,
expiresAt: inFlightUpload.expires_at,
chunkSize: inFlightUpload.chunk_size
? +inFlightUpload.chunk_size
: undefined,
failedReason: isMultipartUploadFailedReason(inFlightUpload.failed_reason)
? inFlightUpload.failed_reason
: undefined,
};
return entityToInFlightMultiPartUpload(inFlightUpload);
}

public async failInflightMultiPartUpload({
Expand Down Expand Up @@ -1093,20 +1090,7 @@ export class PostgresDatabase implements Database {
`Deleted ${numDeletedRows} in flight uploads past their expired dates.`
);

return {
uploadId: updatedInFlightUpload.upload_id,
uploadKey: updatedInFlightUpload.upload_key,
createdAt: updatedInFlightUpload.created_at,
expiresAt: updatedInFlightUpload.expires_at,
chunkSize: updatedInFlightUpload.chunk_size
? +updatedInFlightUpload.chunk_size
: undefined,
failedReason: isMultipartUploadFailedReason(
updatedInFlightUpload.failed_reason
)
? updatedInFlightUpload.failed_reason
: undefined,
};
return entityToInFlightMultiPartUpload(updatedInFlightUpload);
});
}

Expand Down Expand Up @@ -1203,20 +1187,56 @@ export class PostgresDatabase implements Database {

public async updateMultipartChunkSize(
chunkSize: number,
uploadId: UploadId
): Promise<void> {
upload: InFlightMultiPartUpload
): Promise<number> {
this.log.debug("Updating multipart chunk size...", {
chunkSize,
});

await this.writer<InFlightMultiPartUploadDBInsert>(
tableNames.inFlightMultiPartUpload
)
.update({
chunk_size: chunkSize.toString(),
})
.where({ upload_id: uploadId })
.forUpdate();
// TODO: This will be brittle if we add more columns to the inFlightMultiPartUpload table
const { uploadId, uploadKey, createdAt, expiresAt, failedReason } = upload;
const chunkSizeStr = chunkSize.toString();

// Use a CAS upsert to ensure we only update the chunk size if it's larger than the current value
// This assists in cases where the last chunk might have been processed first
const bestKnownChunkSize = await this.writer.transaction(async (trx) => {
const query = `
INSERT INTO ${tableNames.inFlightMultiPartUpload} (upload_id, upload_key, created_at, expires_at, chunk_size, failed_reason)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (upload_id) DO UPDATE SET
chunk_size = CASE
WHEN ${tableNames.inFlightMultiPartUpload}.chunk_size IS NULL OR ${tableNames.inFlightMultiPartUpload}.chunk_size::bigint < EXCLUDED.chunk_size::bigint
THEN EXCLUDED.chunk_size
ELSE ${tableNames.inFlightMultiPartUpload}.chunk_size
END
RETURNING chunk_size;
`;

const result = await trx.raw(query, [
uploadId,
uploadKey,
createdAt,
expiresAt,
chunkSizeStr,
failedReason || null,
]);
return result.rows[0].chunk_size;
});

if (bestKnownChunkSize !== chunkSizeStr) {
this.log.warn(
"Chunk size not updated because current size is larger or equal.",
{
currentChunkSize: bestKnownChunkSize,
attemptedChunkSize: chunkSize,
}
);
} else {
this.log.debug("Chunk size updated successfully.", {
chunkSize,
});
}
return +bestKnownChunkSize;
}

public async updatePlannedDataItemAsFailed({
Expand Down Expand Up @@ -1256,3 +1276,18 @@ function isMultipartUploadFailedReason(
): reason is MultipartUploadFailedReason {
return ["INVALID", "UNDERFUNDED"].includes(reason ?? "");
}

function entityToInFlightMultiPartUpload(
entity: InFlightMultiPartUploadDBResult
): InFlightMultiPartUpload {
return {
uploadId: entity.upload_id,
uploadKey: entity.upload_key,
createdAt: entity.created_at,
expiresAt: entity.expires_at,
chunkSize: entity.chunk_size ? +entity.chunk_size : undefined,
failedReason: isMultipartUploadFailedReason(entity.failed_reason)
? entity.failed_reason
: undefined,
};
}
33 changes: 29 additions & 4 deletions src/arch/payment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export interface PaymentService {
): Promise<ReserveBalanceResponse>;
refundBalanceForData(params: RefundBalanceParams): Promise<void>;
getFiatToARConversionRate(currency: "usd"): Promise<number>; // TODO: create type for currency
paymentServiceURL: string;
paymentServiceURL: string | undefined;
}

const allowedReserveBalanceResponse: ReserveBalanceResponse = {
Expand All @@ -103,8 +103,8 @@ export class TurboPaymentService implements PaymentService {
// TODO: create a client config with base url pointing at the base url of the payment service
private readonly axios: AxiosInstance = createAxiosInstance({}),
private readonly logger: winston.Logger = defaultLogger,
readonly paymentServiceURL: string = process.env.PAYMENT_SERVICE_BASE_URL ??
"payment.ardrive.dev",
readonly paymentServiceURL: string | undefined = process.env
.PAYMENT_SERVICE_BASE_URL,
paymentServiceProtocol: string = process.env.PAYMENT_SERVICE_PROTOCOL ??
"https"
) {
Expand All @@ -114,7 +114,9 @@ export class TurboPaymentService implements PaymentService {
shouldAllowArFSData,
});
this.axios = axios;
this.paymentServiceURL = `${paymentServiceProtocol}://${paymentServiceURL}`;
this.paymentServiceURL = paymentServiceURL
? `${paymentServiceProtocol}://${paymentServiceURL}`
: undefined;
}

public async checkBalanceForData({
Expand Down Expand Up @@ -142,6 +144,17 @@ export class TurboPaymentService implements PaymentService {
};
}

if (!this.paymentServiceURL) {
logger.debug(
"No payment service URL supplied. Simulating no balance at payment service..."
);

return {
userHasSufficientBalance: false,
bytesCostInWinc: W(0),
};
}

logger.debug("Calling payment service to check balance...");

const token = sign({}, secret, {
Expand Down Expand Up @@ -237,6 +250,18 @@ export class TurboPaymentService implements PaymentService {
return allowedReserveBalanceResponse;
}

if (!this.paymentServiceURL) {
logger.debug(
"No payment service URL supplied. Simulating unsuccessful balance reservation at payment service..."
);

return {
walletExists: false,
costOfDataItem: W(0),
isReserved: false,
};
}

logger.debug("Calling payment service to reserve balance...");

const token = sign({}, secret, {
Expand Down
Loading

0 comments on commit 7f005fc

Please sign in to comment.