diff --git a/docker-compose.yml b/docker-compose.yml
index 4c5d352..f947836 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -10,7 +10,7 @@ services:
DB_HOST: upload-service-pg
DB_PORT: 5432
DB_PASSWORD: postgres
- PAYMENT_SERVICE_BASE_URL: ${PAYMENT_SERVICE_BASE_URL:-payment.ardrive.dev}
+ PAYMENT_SERVICE_BASE_URL: ${PAYMENT_SERVICE_BASE_URL:-}
MAX_DATA_ITEM_SIZE: ${MAX_DATA_ITEM_SIZE:-10737418240}
ALLOW_LISTED_ADDRESSES: ${ALLOW_LISTED_ADDRESSES:-}
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
@@ -68,6 +68,8 @@ services:
S3_FORCE_PATH_STYLE: ${S3_FORCE_PATH_STYLE:-}
AWS_REGION: ${AWS_REGION:-us-east-1}
OVERDUE_DATA_ITEM_THRESHOLD_MS: ${OVERDUE_DATA_ITEM_THRESHOLD_MS:-0} # plan data items immediately into bundles when plan-bundle runs
+ ports:
+ - "${FULFILLMENT_PORT:-4000}:${FULFILLMENT_PORT:-4000}"
depends_on:
localstack:
diff --git a/ecs/fulfillment-pipeline/src/app.ts b/ecs/fulfillment-pipeline/src/app.ts
index 1782b2d..07d3fd4 100644
--- a/ecs/fulfillment-pipeline/src/app.ts
+++ b/ecs/fulfillment-pipeline/src/app.ts
@@ -15,6 +15,9 @@
* along with this program. If not, see .
*/
import { Message } from "@aws-sdk/client-sqs";
+import Koa from "koa";
+import Router from "koa-router";
+import * as promClient from "prom-client";
import { Consumer } from "sqs-consumer";
import { ArweaveGateway } from "../../../src/arch/arweaveGateway";
@@ -25,6 +28,7 @@ import { postBundleHandler } from "../../../src/jobs/post";
import { prepareBundleHandler } from "../../../src/jobs/prepare";
import { seedBundleHandler } from "../../../src/jobs/seed";
import globalLogger from "../../../src/logger";
+import { MetricRegistry } from "../../../src/metricRegistry";
import { loadConfig } from "../../../src/utils/config";
import { getS3ObjectStore } from "../../../src/utils/objectStoreUtils";
import { createFinalizeUploadConsumerQueue } from "./jobs/finalize";
@@ -236,7 +240,7 @@ type ConsumerProvisioningConfig = {
const consumerQueues: ConsumerProvisioningConfig[] = [
{
envVarCountStr: process.env.NUM_FINALIZE_UPLOAD_CONSUMERS,
- defaultCount: 10,
+ defaultCount: 2,
createConsumerQueueFn: () =>
createFinalizeUploadConsumerQueue({
logger: globalLogger,
@@ -321,3 +325,32 @@ if (process.env.VERIFY_BUNDLE_ENABLED === "true") {
});
setUpAndStartJobScheduler(verifyBundleJobScheduler);
}
+
+const app = new Koa();
+const router = new Router();
+
+const metricsRegistry = MetricRegistry.getInstance().getRegistry();
+promClient.collectDefaultMetrics({ register: metricsRegistry });
+
+// Prometheus
+router.get(["/fulfillment_metrics", "metrics"], async (ctx, next) => {
+ ctx.body = await metricsRegistry.metrics();
+ return next();
+});
+
+// Health check
+router.get(["/health", "/"], (ctx, next) => {
+ ctx.body = "OK";
+ return next();
+});
+const port = +(process.env.FULFILLMENT_PORT ?? process.env.PORT ?? 4000);
+app.use(router.routes());
+const server = app.listen(port);
+
+globalLogger.info(
+ `Fulfillment pipeline service started with node environment ${process.env.NODE_ENV} on port ${port}...`
+);
+
+server.on("error", (error) => {
+ globalLogger.error("Server error", error);
+});
diff --git a/ecs/fulfillment-pipeline/src/jobs/finalize.ts b/ecs/fulfillment-pipeline/src/jobs/finalize.ts
index 9a4c779..67d7671 100644
--- a/ecs/fulfillment-pipeline/src/jobs/finalize.ts
+++ b/ecs/fulfillment-pipeline/src/jobs/finalize.ts
@@ -73,8 +73,9 @@ export function createFinalizeUploadConsumerQueue({
} catch (error) {
if (error instanceof DataItemExistsWarning) {
finalizeUploadLogger.warn("Data item already exists", {
- error,
- message,
+ error: error.message,
+ messageId: message.MessageId,
+ messageBody: message.Body,
});
return;
} else if (error instanceof InsufficientBalance) {
diff --git a/package.json b/package.json
index b05ab30..1b4601f 100644
--- a/package.json
+++ b/package.json
@@ -77,7 +77,7 @@
"typescript": "^4.7.4"
},
"dependencies": {
- "@ardrive/ardrive-promise-cache": "^1.1.4",
+ "@ardrive/ardrive-promise-cache": "^1.3.0",
"@aws-sdk/abort-controller": "^3.303.0",
"@aws-sdk/client-s3": "^3.367.0",
"@aws-sdk/client-secrets-manager": "^3.363.0",
diff --git a/src/arch/db/database.ts b/src/arch/db/database.ts
index d9b1c42..46e0c94 100644
--- a/src/arch/db/database.ts
+++ b/src/arch/db/database.ts
@@ -152,7 +152,7 @@ export interface Database {
}: {
uploadId: UploadId;
uploadKey: string;
- }): Promise;
+ }): Promise;
finalizeMultiPartUpload(params: {
uploadId: UploadId;
etag: string;
@@ -180,8 +180,8 @@ export interface Database {
): Promise;
updateMultipartChunkSize(
chunkSize: number,
- uploadId: UploadId
- ): Promise;
+ upload: InFlightMultiPartUpload
+ ): Promise;
updatePlannedDataItemAsFailed(params: {
dataItemId: DataItemId;
diff --git a/src/arch/db/postgres.ts b/src/arch/db/postgres.ts
index bf4e594..887de5e 100644
--- a/src/arch/db/postgres.ts
+++ b/src/arch/db/postgres.ts
@@ -36,7 +36,6 @@ import {
FinishedMultiPartUploadDBInsert,
FinishedMultiPartUploadDBResult,
InFlightMultiPartUpload,
- InFlightMultiPartUploadDBInsert,
InFlightMultiPartUploadDBResult,
InFlightMultiPartUploadParams,
InsertNewBundleParams,
@@ -968,18 +967,27 @@ export class PostgresDatabase implements Database {
public async insertInFlightMultiPartUpload({
uploadId,
uploadKey,
- }: InFlightMultiPartUploadParams): Promise {
+ }: InFlightMultiPartUploadParams): Promise {
this.log.debug("Inserting in flight multipart upload...", {
uploadId,
uploadKey,
});
- return this.writer.transaction(async (knexTransaction) => {
- await knexTransaction(tableNames.inFlightMultiPartUpload).insert({
- upload_id: uploadId,
- upload_key: uploadKey,
- });
+ const result = await this.writer.transaction(async (knexTransaction) => {
+ const [insertedRow] =
+ await knexTransaction(
+ tableNames.inFlightMultiPartUpload
+ )
+ .insert({
+ upload_id: uploadId,
+ upload_key: uploadKey,
+ })
+ .returning("*"); // Returning the inserted row
+
+ return insertedRow;
});
+
+ return entityToInFlightMultiPartUpload(result);
}
public async finalizeMultiPartUpload({
@@ -1042,18 +1050,7 @@ export class PostgresDatabase implements Database {
throw new MultiPartUploadNotFound(uploadId);
}
- return {
- uploadId: inFlightUpload.upload_id,
- uploadKey: inFlightUpload.upload_key,
- createdAt: inFlightUpload.created_at,
- expiresAt: inFlightUpload.expires_at,
- chunkSize: inFlightUpload.chunk_size
- ? +inFlightUpload.chunk_size
- : undefined,
- failedReason: isMultipartUploadFailedReason(inFlightUpload.failed_reason)
- ? inFlightUpload.failed_reason
- : undefined,
- };
+ return entityToInFlightMultiPartUpload(inFlightUpload);
}
public async failInflightMultiPartUpload({
@@ -1093,20 +1090,7 @@ export class PostgresDatabase implements Database {
`Deleted ${numDeletedRows} in flight uploads past their expired dates.`
);
- return {
- uploadId: updatedInFlightUpload.upload_id,
- uploadKey: updatedInFlightUpload.upload_key,
- createdAt: updatedInFlightUpload.created_at,
- expiresAt: updatedInFlightUpload.expires_at,
- chunkSize: updatedInFlightUpload.chunk_size
- ? +updatedInFlightUpload.chunk_size
- : undefined,
- failedReason: isMultipartUploadFailedReason(
- updatedInFlightUpload.failed_reason
- )
- ? updatedInFlightUpload.failed_reason
- : undefined,
- };
+ return entityToInFlightMultiPartUpload(updatedInFlightUpload);
});
}
@@ -1203,20 +1187,56 @@ export class PostgresDatabase implements Database {
public async updateMultipartChunkSize(
chunkSize: number,
- uploadId: UploadId
- ): Promise {
+ upload: InFlightMultiPartUpload
+ ): Promise {
this.log.debug("Updating multipart chunk size...", {
chunkSize,
});
- await this.writer(
- tableNames.inFlightMultiPartUpload
- )
- .update({
- chunk_size: chunkSize.toString(),
- })
- .where({ upload_id: uploadId })
- .forUpdate();
+ // TODO: This will be brittle if we add more columns to the inFlightMultiPartUpload table
+ const { uploadId, uploadKey, createdAt, expiresAt, failedReason } = upload;
+ const chunkSizeStr = chunkSize.toString();
+
+ // Use a CAS upsert to ensure we only update the chunk size if it's larger than the current value
+ // This assists in cases where the last chunk might have been processed first
+ const bestKnownChunkSize = await this.writer.transaction(async (trx) => {
+ const query = `
+ INSERT INTO ${tableNames.inFlightMultiPartUpload} (upload_id, upload_key, created_at, expires_at, chunk_size, failed_reason)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ON CONFLICT (upload_id) DO UPDATE SET
+ chunk_size = CASE
+ WHEN ${tableNames.inFlightMultiPartUpload}.chunk_size IS NULL OR ${tableNames.inFlightMultiPartUpload}.chunk_size::bigint < EXCLUDED.chunk_size::bigint
+ THEN EXCLUDED.chunk_size
+ ELSE ${tableNames.inFlightMultiPartUpload}.chunk_size
+ END
+ RETURNING chunk_size;
+ `;
+
+ const result = await trx.raw(query, [
+ uploadId,
+ uploadKey,
+ createdAt,
+ expiresAt,
+ chunkSizeStr,
+ failedReason || null,
+ ]);
+ return result.rows[0].chunk_size;
+ });
+
+ if (bestKnownChunkSize !== chunkSizeStr) {
+ this.log.warn(
+ "Chunk size not updated because current size is larger or equal.",
+ {
+ currentChunkSize: bestKnownChunkSize,
+ attemptedChunkSize: chunkSize,
+ }
+ );
+ } else {
+ this.log.debug("Chunk size updated successfully.", {
+ chunkSize,
+ });
+ }
+ return +bestKnownChunkSize;
}
public async updatePlannedDataItemAsFailed({
@@ -1256,3 +1276,18 @@ function isMultipartUploadFailedReason(
): reason is MultipartUploadFailedReason {
return ["INVALID", "UNDERFUNDED"].includes(reason ?? "");
}
+
+function entityToInFlightMultiPartUpload(
+ entity: InFlightMultiPartUploadDBResult
+): InFlightMultiPartUpload {
+ return {
+ uploadId: entity.upload_id,
+ uploadKey: entity.upload_key,
+ createdAt: entity.created_at,
+ expiresAt: entity.expires_at,
+ chunkSize: entity.chunk_size ? +entity.chunk_size : undefined,
+ failedReason: isMultipartUploadFailedReason(entity.failed_reason)
+ ? entity.failed_reason
+ : undefined,
+ };
+}
diff --git a/src/arch/payment.ts b/src/arch/payment.ts
index 5d6097a..d23191d 100644
--- a/src/arch/payment.ts
+++ b/src/arch/payment.ts
@@ -87,7 +87,7 @@ export interface PaymentService {
): Promise;
refundBalanceForData(params: RefundBalanceParams): Promise;
getFiatToARConversionRate(currency: "usd"): Promise; // TODO: create type for currency
- paymentServiceURL: string;
+ paymentServiceURL: string | undefined;
}
const allowedReserveBalanceResponse: ReserveBalanceResponse = {
@@ -103,8 +103,8 @@ export class TurboPaymentService implements PaymentService {
// TODO: create a client config with base url pointing at the base url of the payment service
private readonly axios: AxiosInstance = createAxiosInstance({}),
private readonly logger: winston.Logger = defaultLogger,
- readonly paymentServiceURL: string = process.env.PAYMENT_SERVICE_BASE_URL ??
- "payment.ardrive.dev",
+ readonly paymentServiceURL: string | undefined = process.env
+ .PAYMENT_SERVICE_BASE_URL,
paymentServiceProtocol: string = process.env.PAYMENT_SERVICE_PROTOCOL ??
"https"
) {
@@ -114,7 +114,9 @@ export class TurboPaymentService implements PaymentService {
shouldAllowArFSData,
});
this.axios = axios;
- this.paymentServiceURL = `${paymentServiceProtocol}://${paymentServiceURL}`;
+ this.paymentServiceURL = paymentServiceURL
+ ? `${paymentServiceProtocol}://${paymentServiceURL}`
+ : undefined;
}
public async checkBalanceForData({
@@ -142,6 +144,17 @@ export class TurboPaymentService implements PaymentService {
};
}
+ if (!this.paymentServiceURL) {
+ logger.debug(
+ "No payment service URL supplied. Simulating no balance at payment service..."
+ );
+
+ return {
+ userHasSufficientBalance: false,
+ bytesCostInWinc: W(0),
+ };
+ }
+
logger.debug("Calling payment service to check balance...");
const token = sign({}, secret, {
@@ -237,6 +250,18 @@ export class TurboPaymentService implements PaymentService {
return allowedReserveBalanceResponse;
}
+ if (!this.paymentServiceURL) {
+ logger.debug(
+ "No payment service URL supplied. Simulating unsuccessful balance reservation at payment service..."
+ );
+
+ return {
+ walletExists: false,
+ costOfDataItem: W(0),
+ isReserved: false,
+ };
+ }
+
logger.debug("Calling payment service to reserve balance...");
const token = sign({}, secret, {
diff --git a/src/jobs/optical-post.ts b/src/jobs/optical-post.ts
index b6f10a7..8c43cc7 100644
--- a/src/jobs/optical-post.ts
+++ b/src/jobs/optical-post.ts
@@ -22,12 +22,7 @@ import winston from "winston";
import { createAxiosInstance } from "../arch/axiosClient";
import logger from "../logger";
import { getOpticalPubKey } from "../utils/getArweaveWallet";
-import {
- SignedDataItemHeader,
- encodeTagsForOptical,
- getNestedDataItemHeaders,
- signDataItemHeader,
-} from "../utils/opticalUtils";
+import { SignedDataItemHeader } from "../utils/opticalUtils";
/** These don't need to succeed */
const optionalOpticalUrls =
@@ -62,31 +57,7 @@ export const opticalPostHandler = async ({
);
const dataItemIds = dataItemHeaders.map((header) => header.id);
- let childLogger = logger.child({ dataItemIds });
-
- // If any BDIs are detected, unpack them 1 nested level deep
- // and include the nested data items in the optical post
- // TODO: Filter this further for ArDrive data
- const nestedStringifiedHeaders = await Promise.all(
- (
- await getNestedDataItemHeaders({
- potentialBDIHeaders: dataItemHeaders,
- logger: childLogger,
- })
- ).map(async (nestedHeader) => {
- const opticalNestedHeader = await signDataItemHeader(
- encodeTagsForOptical(nestedHeader)
- );
- return JSON.stringify(opticalNestedHeader);
- })
- );
- const nestedDataItemIds = nestedStringifiedHeaders.map(
- (headerString) => (JSON.parse(headerString) as SignedDataItemHeader).id
- );
- childLogger = childLogger.child({ nestedDataItemIds });
- stringifiedDataItemHeaders = stringifiedDataItemHeaders.concat(
- nestedStringifiedHeaders
- );
+ const childLogger = logger.child({ dataItemIds });
// Create a JSON array string out of the stringified headers
const postBody = `[${stringifiedDataItemHeaders.join(",")}]`;
diff --git a/src/jobs/unbundle-bdi.ts b/src/jobs/unbundle-bdi.ts
index 96cdca8..d4d2d3a 100644
--- a/src/jobs/unbundle-bdi.ts
+++ b/src/jobs/unbundle-bdi.ts
@@ -20,16 +20,21 @@ import { SQSEvent } from "aws-lambda";
import pLimit from "p-limit";
import winston from "winston";
-import { deleteMessages } from "../arch/queues";
+import { deleteMessages, enqueue } from "../arch/queues";
import { rawDataItemStartFromParsedHeader } from "../bundles/rawDataItemStartFromParsedHeader";
import baseLogger from "../logger";
import { ParsedDataItemHeader, TransactionId } from "../types/types";
+import { ownerToNormalizedB64Address } from "../utils/base64";
import { payloadContentTypeFromDecodedTags } from "../utils/common";
import {
getDataItemData,
getS3ObjectStore,
putDataItemRaw,
} from "../utils/objectStoreUtils";
+import {
+ encodeTagsForOptical,
+ signDataItemHeader,
+} from "../utils/opticalUtils";
export const handler = async (event: SQSEvent) => {
const handlerLogger = baseLogger.child({ job: "unbundle-bdi-job" });
@@ -126,7 +131,15 @@ export async function unbundleBDIHandler(
const nestedDataItemParallelLimit = pLimit(10);
await Promise.all(
parsedDataItemHeaders.map((parsedDataItemHeader) => {
- const { id, tags, dataOffset, dataSize } = parsedDataItemHeader;
+ const {
+ id,
+ tags,
+ dataOffset,
+ dataSize,
+ signature,
+ target,
+ owner,
+ } = parsedDataItemHeader;
const nestedItemLogger = bdiLogger.child({
nestedDataItemId: id,
});
@@ -157,6 +170,23 @@ export async function unbundleBDIHandler(
dataOffset - rawDataItemDataStart
);
+ // TODO: Consider enqueue in batches
+ await enqueue(
+ "optical-post",
+ await signDataItemHeader(
+ encodeTagsForOptical({
+ id,
+ signature,
+ owner,
+ owner_address: ownerToNormalizedB64Address(owner),
+ target: target ?? "",
+ content_type: payloadContentType,
+ data_size: dataSize,
+ tags,
+ })
+ )
+ );
+
nestedItemLogger.debug("Finished caching nested data item.", {
payloadContentType,
rangeString,
diff --git a/src/routes/multiPartUploads.ts b/src/routes/multiPartUploads.ts
index 7929531..8467337 100644
--- a/src/routes/multiPartUploads.ts
+++ b/src/routes/multiPartUploads.ts
@@ -14,6 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+import { ReadThroughPromiseCache } from "@ardrive/ardrive-promise-cache";
import { Message } from "@aws-sdk/client-sqs";
import { JWKInterface, Tag } from "arbundles";
import { Base64UrlString } from "arweave/node/lib/utils";
@@ -68,6 +69,7 @@ import {
uploadPart,
} from "../utils/objectStoreUtils";
import {
+ containsAns104Tags,
encodeTagsForOptical,
signDataItemHeader,
} from "../utils/opticalUtils";
@@ -80,11 +82,26 @@ import {
const shouldSkipBalanceCheck = process.env.SKIP_BALANCE_CHECKS === "true";
const opticalBridgingEnabled = process.env.OPTICAL_BRIDGING_ENABLED !== "false";
+const maxAllowablePartNumber = 10_000; // AWS S3 MultiPartUpload part number limitation
export const chunkMinSize = 1024 * 1024 * 5; // 5MiB - AWS minimum
export const chunkMaxSize = 1024 * 1024 * 500; // 500MiB // NOTE: AWS supports upto 5GiB
export const defaultChunkSize = 25_000_000; // 25MB
+const inFlightUploadCache = new ReadThroughPromiseCache<
+ UploadId,
+ InFlightMultiPartUpload,
+ Database
+>({
+ cacheParams: {
+ cacheCapacity: 1000,
+ cacheTTL: 60_000,
+ },
+ readThroughFunction: async (uploadId, database) => {
+ return database.getInflightMultiPartUpload(uploadId);
+ },
+});
+
export async function createMultiPartUpload(ctx: KoaContext) {
const { database, objectStore, logger } = ctx.state;
logger.debug("Creating new multipart upload");
@@ -93,10 +110,13 @@ export async function createMultiPartUpload(ctx: KoaContext) {
logger.debug("Created new multipart upload", { newUploadId });
// create new upload
- await database.insertInFlightMultiPartUpload({
- uploadId: newUploadId,
- uploadKey,
- });
+ await inFlightUploadCache.put(
+ newUploadId,
+ database.insertInFlightMultiPartUpload({
+ uploadId: newUploadId,
+ uploadKey,
+ })
+ );
// In order to combat RDS replication-lag-related issues with posting chunks (parts)
// for this uploadId immediately after receiving the fresh uploadId, impose an
@@ -261,7 +281,7 @@ export async function postDataItemChunk(ctx: KoaContext) {
const contentLength = ctx.req.headers["content-length"];
- if (!contentLength) {
+ if (!contentLength || isNaN(+contentLength) || +contentLength <= 0) {
ctx.status = 400;
ctx.message =
"Content-Length header is required a must be a positive integer.";
@@ -271,7 +291,7 @@ export async function postDataItemChunk(ctx: KoaContext) {
logger.debug("Posting data item chunk", { uploadId, chunkOffset });
// check that upload exists
try {
- const upload = await database.getInflightMultiPartUpload(uploadId);
+ const upload = await inFlightUploadCache.get(uploadId, database);
logger.debug("Got multipart upload", { ...upload });
// No need to proceed if this upload has already failed
@@ -279,35 +299,48 @@ export async function postDataItemChunk(ctx: KoaContext) {
throw new InvalidDataItem();
}
- const sizeOfChunk = contentLength ? +contentLength : defaultChunkSize;
- const chunkSize = upload.chunkSize || sizeOfChunk;
+ const sizeOfIncomingChunk = +contentLength;
+ const expectedChunkSize = await computeExpectedChunkSize({
+ upload,
+ sizeOfIncomingChunk,
+ logger,
+ database,
+ });
- try {
- if (!upload.chunkSize || sizeOfChunk > upload.chunkSize) {
- logger.debug("Updating chunk size in database", {
- uploadId,
- sizeOfChunk,
- });
- // NOTE: this may be better suited in a redis or read through cache
- await database.updateMultipartChunkSize(sizeOfChunk, uploadId);
- logger.debug("Successfully updated chunk size 👍", {
- uploadId,
- sizeOfChunk,
- });
- }
- logger.debug("Retrieved chunk size for upload", {
+ if (chunkOffset % expectedChunkSize !== 0) {
+ /* This can happen when the last chunk is processed first and
+ has a size that is not a multiple of the expected chunk size.
+ Retrying that chunk upload should usually clear that up.
+
+ A problematic case is when the chunk is smaller than the intended
+ chunk size but is a multiple of it. In this case, we can't tell
+ if the chunk size is wrong or if the chunk is the last one. But
+ two outcomes are possible there:
+ 1) The computed part number is large, but sufficiently higher than
+ the preceding chunk's will be. If so, the upload can still complete.
+ 2) The part number chosen is too large, and the chunk upload will fail,
+ but might succeed on a successive try. Forcing the part number to
+ the max allowed in this case is not worth the risk of getting it wrong.
+ */
+
+ // TODO: Could also check db again for updated chunk size from other chunks
+ inFlightUploadCache.remove(uploadId); // Precautionary measure
+ throw new InvalidChunk();
+ }
+
+ const partNumber = Math.floor(chunkOffset / expectedChunkSize) + 1; // + 1 due to 1-indexing of part numbers
+ if (partNumber > maxAllowablePartNumber) {
+ // This can happen if the user chose a chunk size too small for the number of chunks their upload needs
+ logger.error("Part number exceeds maximum allowable part number", {
uploadId,
- sizeOfChunk,
- });
- } catch (error) {
- logger.warn("Collision while updating chunk size... continuing.", {
- uploadId,
- error: error instanceof Error ? error.message : error,
+ partNumber,
+ chunkOffset,
+ expectedChunkSize,
+ sizeOfIncomingChunk,
});
+ throw new InvalidChunk();
}
- const partNumber = Math.floor(chunkOffset / chunkSize) + 1;
-
// Need to give content length here for last chunk or s3 will wait for more data
const etag = await uploadPart({
objectStore,
@@ -315,9 +348,14 @@ export async function postDataItemChunk(ctx: KoaContext) {
stream: ctx.req,
uploadId,
partNumber,
- sizeOfChunk,
+ sizeOfChunk: sizeOfIncomingChunk,
+ });
+ logger.info("Uploaded part", {
+ uploadId,
+ partNumber,
+ etag,
+ sizeOfChunk: sizeOfIncomingChunk,
});
- logger.info("Uploaded part", { uploadId, partNumber, etag, sizeOfChunk });
ctx.status = 200;
} catch (error) {
@@ -344,6 +382,49 @@ export async function postDataItemChunk(ctx: KoaContext) {
return; // do not return next();
}
+async function computeExpectedChunkSize({
+ upload,
+ logger,
+ sizeOfIncomingChunk,
+ database,
+}: {
+ upload: InFlightMultiPartUpload;
+ sizeOfIncomingChunk: number;
+ logger: winston.Logger;
+ database: Database;
+}): Promise {
+ const uploadId = upload.uploadId;
+ let expectedChunkSize = upload.chunkSize;
+
+ if (!expectedChunkSize || sizeOfIncomingChunk > expectedChunkSize) {
+ logger.debug("Updating chunk size in database", {
+ uploadId,
+ prevChunkSize: expectedChunkSize,
+ newChunkSize: sizeOfIncomingChunk,
+ });
+ // NOTE: this may be better suited in a redis + read through cache
+ expectedChunkSize = await database.updateMultipartChunkSize(
+ sizeOfIncomingChunk,
+ upload
+ );
+ // Memoize this update
+ upload.chunkSize = expectedChunkSize;
+ void inFlightUploadCache.put(uploadId, Promise.resolve(upload));
+ logger.debug("Successfully updated chunk size 👍", {
+ uploadId,
+ estimatedSizeOfIncomingChunk: sizeOfIncomingChunk,
+ expectedChunkSize,
+ });
+ }
+ logger.debug("Retrieved chunk size for upload", {
+ uploadId,
+ sizeOfChunk: sizeOfIncomingChunk,
+ expectedChunkSize,
+ });
+
+ return expectedChunkSize;
+}
+
export async function finalizeMultipartUploadWithQueueMessage({
message,
paymentService,
@@ -544,7 +625,7 @@ export async function finalizeMultipartUpload({
};
}
- const inFlightMPUEntity = await database.getInflightMultiPartUpload(uploadId);
+ const inFlightMPUEntity = await inFlightUploadCache.get(uploadId, database);
const signedReceipt = await finalizeMPUWithInFlightEntity({
uploadId,
paymentService,
@@ -696,6 +777,7 @@ export async function finalizeMPUWithInFlightEntity({
uploadId,
failedReason: "INVALID",
});
+ inFlightUploadCache.remove(uploadId);
dataItemReadable.destroy();
throw new InvalidDataItem();
}
@@ -732,8 +814,6 @@ export async function finalizeMPUWithInFlightEntity({
};
fnLogger = fnLogger.child(filterKeysFromObject(dataItemInfo, ["signature"]));
- // TODO: handle bdis?
-
fnLogger.info("Parsed multi-part upload data item id and tags", {
tags,
});
@@ -745,6 +825,7 @@ export async function finalizeMPUWithInFlightEntity({
etag: finalizedEtag,
dataItemId,
});
+ inFlightUploadCache.remove(uploadId);
return await finalizeMPUWithValidatedInfo({
uploadId,
@@ -1039,6 +1120,7 @@ export async function finalizeMPUWithDataItemInfo({
uploadId,
failedReason: "UNDERFUNDED",
});
+ inFlightUploadCache.remove(uploadId);
throw new InsufficientBalance();
}
@@ -1077,6 +1159,21 @@ export async function finalizeMPUWithDataItemInfo({
fnLogger.debug("Skipped optical posting.");
}
+ // Enqueue data item for unbundling if it appears to be a BDI
+ if (containsAns104Tags(dataItemInfo.tags)) {
+ try {
+ logger.debug("Enqueuing BDI for unbundling...");
+ await enqueue("unbundle-bdi", dataItemInfo.dataItemId);
+ } catch (bdiEnqueueError) {
+ // Soft error, just log
+ logger.error(
+ `Error while attempting to enqueue for bdi unbundling!`,
+ bdiEnqueueError
+ );
+ MetricRegistry.unbundleBdiEnqueueFail.inc();
+ }
+ }
+
fnLogger.debug("Inserting new_data_item into db...");
const dbInsertStart = Date.now();
// TODO: Add deadline height to the new data item entity
diff --git a/src/utils/opticalUtils.test.ts b/src/utils/opticalUtils.test.ts
index 1f31d86..3bf9000 100644
--- a/src/utils/opticalUtils.test.ts
+++ b/src/utils/opticalUtils.test.ts
@@ -16,16 +16,11 @@
*/
import chai from "chai";
import deepEqualInAnyOrder from "deep-equal-in-any-order";
-import { createReadStream } from "fs";
-import { stub } from "sinon";
-import { FileSystemObjectStore } from "../arch/fileSystemObjectStore";
-import logger from "../logger";
import {
DataItemHeader,
encodeTagsForOptical,
filterForNestedBundles,
- getNestedDataItemHeaders,
} from "./opticalUtils";
const { expect } = chai;
@@ -139,124 +134,3 @@ describe("The filterForNestedBundles function", () => {
expect(filterForNestedBundles(stubDataItemHeader)).to.equal(true);
});
});
-
-describe("The getNestedDataItemHeaders function", () => {
- const objectStore = new FileSystemObjectStore();
-
- it("returns correct nested data item headers for a real bundled data item", async () => {
- stub(objectStore, "getObject").resolves({
- readable: createReadStream("tests/stubFiles/bdiDataItem"),
- etag: "stubEtag",
- });
- stub(objectStore, "getObjectPayloadInfo").resolves({
- payloadContentType: "application/octet-stream",
- payloadDataStart: 1100,
- });
-
- expect(
- await getNestedDataItemHeaders({
- objectStore,
- logger: logger,
- potentialBDIHeaders: [
- {
- id: "cTbz16hHhGW4HF-uMJ5u8RoCg9atYmyMFWGd-kzhF_Q",
- owner: "owner",
- owner_address: "owner_address",
- signature: "signature",
- target: "target",
- content_type: "content_type",
- data_size: 1234,
- tags: [
- { name: "QnVuZGxlLUZvcm1hdA", value: "YmluYXJ5" },
- { name: "QnVuZGxlLVZlcnNpb24", value: "Mi4wLjA" },
- ],
- },
- ],
- })
- ).to.deep.equalInAnyOrder([
- {
- id: "5SLX-Vuzy90kg2DBv8yrOCfUPF-4KB43fFv5uUwxzYc",
- signature:
- "eqnekVilJFbzVQa2g-AIkX10trRIBEKZV_0tz0zya_XJdcrVATm7_0nWk3VIXXz4-Cykd9BBS0AGbcLmFfohiiaA5knAXGx3WP0bMyCiUR4TgkNIwVnQDtCKRPWdEaFizd-t6PqVy-KIfG0iJoKqE4u0BanBjgSU-R_7-K4pP3g9d3ScKS8vImLAmVfy29ubyE5ubALNl1c0OruSRfig37DT0Vf1ZDfqmllPcRsUrxbXmX2dtcEvDcaotqgcRTu8iRCYeECGOl7hFR50SUAKAQKjJKdGbR_5dFdGLBT2clzwGAHaKdq87-eMseHwbCgT9gPvqHmpBshFGSab8dzwIiNXGLquAxrzL8i3Y2Bl578BPhCIGRM9Vm0T1kiQBECtsORqZVwENI-urKU3BVgGLDmfL9iplNHbPhvKwgEbTaFvi6SMAfbecZSxMF5UzZfpm-m4O2Ba0z6iEoB9iLwEx8t446Bjn3iTLgRpvO5epeYICUjN7cBRelnbgj2No9hHXyea1VYnPUzhmDkiYoatIL7ISXdXr5OY0urjCqzqgLOluA2QvXszEf1kO61SIOblow1i38fvfe06j2txPSlccZTGh_Ug7yntTa6WijlFA0tfWk_AGFtwogLqOob3vD5lhPACkig6UPDonSD646xD4EcQ_sINIGhIlJdMJsF13z4",
- owner:
- "2_E-vc9U6OYFv26YxHNtMSmxsIixQAsFrCQSVW9nEvPHkA2uZ0R7ILwqzqs7DtJoptp5qpar88qtsITQOCcJsSdcISvuJUXVgX_12520bSMhErkGhdOpRI_AWqBJTZwX5dUDf6_jc5JFMHsZar8At2vdq3MTeNJFSh2NoDMFndTkgCZBVrJjtrwXPcFnWZ3C7_DTYwF9-FHVHIQorHuBL5wXJI3gojuBHtuOxWrR61k2V14P05wPKZoJW85hVOzJfd7VECvbMlgfhqpS9Qf7V2pztEdNCOETPN914HaPd4fEAB2BUHFfqt1XTbjEgHADEQHtdwA6sMG17pX1V4POfdnMZEvlK5q7F2LfNYhlkaWV-jUQ36aV6nRRpq9HrAeAwcwLCklqK140axrinMR9gEfKsqCAbbWA92G_Rn88G3y-6F3hw6BPsmNUp8ggW-fzR35YC3BCLNYVwmM5yBQiE1oDoJz1o6q3wR3swHCL_QCvsJ0yDiGWKAt_aHd-58x4lLeFWFRnlGmbMLZ7b6NRhwRvRiAxruYxEGNH6TKr2rVYdLV13CV4frg9V_cf12--rOa8OEYFHcQLqVWBh3z9rA5uO5KMPgkhhRet1pMlTMTa4zcxJ12HeRC4Li297iQh0hf1rYUu4ZGMAEa-ppjhFES67etlN4NlyyONDrFTaLM",
- owner_address: "31LPFYoow2G7j-eSSsrIh8OlNaARZ84-80J-8ba68d8",
- target: "",
- content_type: "application/json",
- data_size: 160,
- tags: [
- { name: "Content-Type", value: "application/json" },
- { name: "ArFS", value: "0.12" },
- { name: "Entity-Type", value: "file" },
- {
- name: "Drive-Id",
- value: "1677890d-5d17-4b70-a1f9-ea3f23ffba30",
- },
- {
- name: "Parent-Folder-Id",
- value: "2e352112-bbe9-4717-9a94-9cdc95a27767",
- },
- {
- name: "File-Id",
- value: "ba300049-f8ee-45a6-92d5-4f384c9b066b",
- },
- { name: "App-Name", value: "ArDrive-App" },
- { name: "App-Platform", value: "Web" },
- { name: "App-Version", value: "2.3.1" },
- { name: "Unix-Time", value: "1687212613" },
- ],
- },
- {
- id: "Is7dfYMRMxxJTiKyxIoJHa-zBD6S58YwhkNEzqq22ww",
- signature:
- "fpRU4294AjPakiTaQuaxgFusemt9wA_9sSxeLzVmpEPN3uVMzZyuMTwdgjcXoLA4lhdplgAXfrUC8cpIyH9hzMWGymEoqYovCrvqngQSEO6k7tuWrDRowaCjwPx2PxtECR41vqF074YVepCRHJ55j1p-J4YQkRudV4Y8UzrfSAUWXhkgBH8Z6MIqF-u9TNSA6cDL5ZZNMzvDccdVLs-ykE--Qf-IkvxoaPdLL41ZbG9C3LbB6IaZSCUBidmuMRcHESUnlofvdPU70gzAsLIZzAg3mqwF8M3FVIOHv4rNFNhyOYOyTLgaJ0KQ7bCNEeUMmaMMEk6mFsrtezy4eoquR6klW8jyrAsh0cbLX0eu-6tu5UDYJdcmcgllgwAti9cJfkwbCe3zvAAfvG_wJvaYN7FRcaipW8DUxfBD0Bab9lNl5DdGYL4oik1OYkyMwlxzrusV3ZjUycZQnUWElMHRX2-Nersj741nIbytAfScG83yiCfS3zXODrysbbc_Tz-Ftp4kR_HcgwhEZbWBhxMLHbvXayDn5gqDqKb81TtxsmL2gkKN6-4A9lhD7L3RdJi96f82iuq-h6P-PMOzkzuCAuy5Gu6J_knDvQ9NrN6L-Kxd3gsJd71GsS2NnuTUUnMGqtXrL55nxoZlYLKGr9O3_t5bj2Py1T3FMbBkqUf4NoE",
- owner:
- "2_E-vc9U6OYFv26YxHNtMSmxsIixQAsFrCQSVW9nEvPHkA2uZ0R7ILwqzqs7DtJoptp5qpar88qtsITQOCcJsSdcISvuJUXVgX_12520bSMhErkGhdOpRI_AWqBJTZwX5dUDf6_jc5JFMHsZar8At2vdq3MTeNJFSh2NoDMFndTkgCZBVrJjtrwXPcFnWZ3C7_DTYwF9-FHVHIQorHuBL5wXJI3gojuBHtuOxWrR61k2V14P05wPKZoJW85hVOzJfd7VECvbMlgfhqpS9Qf7V2pztEdNCOETPN914HaPd4fEAB2BUHFfqt1XTbjEgHADEQHtdwA6sMG17pX1V4POfdnMZEvlK5q7F2LfNYhlkaWV-jUQ36aV6nRRpq9HrAeAwcwLCklqK140axrinMR9gEfKsqCAbbWA92G_Rn88G3y-6F3hw6BPsmNUp8ggW-fzR35YC3BCLNYVwmM5yBQiE1oDoJz1o6q3wR3swHCL_QCvsJ0yDiGWKAt_aHd-58x4lLeFWFRnlGmbMLZ7b6NRhwRvRiAxruYxEGNH6TKr2rVYdLV13CV4frg9V_cf12--rOa8OEYFHcQLqVWBh3z9rA5uO5KMPgkhhRet1pMlTMTa4zcxJ12HeRC4Li297iQh0hf1rYUu4ZGMAEa-ppjhFES67etlN4NlyyONDrFTaLM",
- owner_address: "31LPFYoow2G7j-eSSsrIh8OlNaARZ84-80J-8ba68d8",
- target: "",
- content_type: "image/gif",
- data_size: 51987,
- tags: [
- { name: "App-Name", value: "ArDrive-App" },
- { name: "App-Platform", value: "Web" },
- { name: "App-Version", value: "2.3.1" },
- { name: "Unix-Time", value: "1687212613" },
- { name: "Content-Type", value: "image/gif" },
- ],
- },
- ]);
- });
-
- it("returns an empty array when passed a BDI header that is not a bundle", async () => {
- stub(objectStore, "getObject").resolves({
- readable: createReadStream("tests/stubFiles/stub1115ByteDataItem"),
- etag: "stubEtag",
- });
- stub(objectStore, "getObjectPayloadInfo").resolves({
- payloadContentType: "application/octet-stream",
- payloadDataStart: 1100,
- });
-
- expect(
- await getNestedDataItemHeaders({
- objectStore,
- logger: logger,
- potentialBDIHeaders: [
- {
- id: "cTbz16hHhGW4HF-uMJ5u8RoCg9atYmyMFWGd-kzhF_Q",
- owner: "owner",
- owner_address: "owner_address",
- signature: "signature",
- target: "target",
- content_type: "content_type",
- data_size: 1234,
- tags: [
- { name: "QnVuZGxlLUZvcm1hdA", value: "YmluYXJ5" },
- { name: "QnVuZGxlLVZlcnNpb24", value: "Mi4wLjA" },
- ],
- },
- ],
- })
- ).to.deep.equalInAnyOrder([]);
- });
-});
diff --git a/src/utils/opticalUtils.ts b/src/utils/opticalUtils.ts
index 6364f1f..9ac503f 100644
--- a/src/utils/opticalUtils.ts
+++ b/src/utils/opticalUtils.ts
@@ -14,17 +14,11 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-import { ArweaveSigner, Tag, deepHash, processStream } from "arbundles";
+import { ArweaveSigner, Tag, deepHash } from "arbundles";
import { stringToBuffer } from "arweave/node/lib/utils";
-import pLimit from "p-limit";
-import winston from "winston";
-import { ObjectStore } from "../arch/objectStore";
-import { ParsedDataItemHeader } from "../types/types";
-import { fromB64Url, ownerToNormalizedB64Address, toB64Url } from "./base64";
-import { payloadContentTypeFromDecodedTags } from "./common";
+import { fromB64Url, toB64Url } from "./base64";
import { getOpticalWallet } from "./getArweaveWallet";
-import { getDataItemData, getS3ObjectStore } from "./objectStoreUtils";
export type DataItemHeader = {
id: string;
@@ -89,87 +83,6 @@ export async function signDataItemHeader(
};
}
-export async function getNestedDataItemHeaders({
- potentialBDIHeaders,
- objectStore = getS3ObjectStore(),
- logger,
-}: {
- potentialBDIHeaders: DataItemHeader[];
- objectStore?: ObjectStore;
- logger: winston.Logger;
-}): Promise {
- const decodedDataItemHeaders = potentialBDIHeaders.map(decodeOpticalizedTags);
- const headersRequiringUnpacking = decodedDataItemHeaders.filter(
- filterForNestedBundles
- );
-
- // Keep the parallelization of this work capped at sensible limits
- const bdiParallelLimit = pLimit(10);
- const bdiDataItemsIds = headersRequiringUnpacking.map((header) => header.id);
- const nestedHeadersPromises = bdiDataItemsIds.map((bdiDataItemId) => {
- return bdiParallelLimit(async () => {
- // Fetch the data item
- const dataItemReadable = await getDataItemData(
- objectStore,
- bdiDataItemId
- );
-
- logger.debug("Processing BDI stream...", {
- bdiDataItemId,
- });
-
- let parsedDataItemHeaders: ParsedDataItemHeader[] = [];
- try {
- // Process it as a bundle and get all the data item info
- parsedDataItemHeaders = (await processStream(
- dataItemReadable
- )) as ParsedDataItemHeader[];
- } catch (error) {
- logger.error("Error processing BDI stream.", {
- bdiDataItemId,
- error,
- });
- return [];
- }
-
- logger.debug("Finished processing BDI stream.", {
- bdiDataItemId,
- parsedDataItemHeaders,
- });
-
- // Return the encoded, signed, and serialized headers for all the nested data items
- const dataItemParallelLimit = pLimit(10);
- const nestedDataItemHeaders = await Promise.all(
- parsedDataItemHeaders.map((parsedDataItemHeader) => {
- return dataItemParallelLimit(async () => {
- const decodedTags = parsedDataItemHeader.tags;
-
- // Get content type from tag if possible
- const contentType = payloadContentTypeFromDecodedTags(decodedTags);
-
- return {
- id: parsedDataItemHeader.id,
- signature: parsedDataItemHeader.signature,
- owner: parsedDataItemHeader.owner,
- owner_address: ownerToNormalizedB64Address(
- parsedDataItemHeader.owner
- ),
- target: parsedDataItemHeader.target ?? "",
- content_type: contentType,
- data_size: parsedDataItemHeader.dataSize,
- tags: decodedTags,
- };
- });
- })
- );
-
- return nestedDataItemHeaders;
- });
- });
-
- return (await Promise.all(nestedHeadersPromises)).flat(1);
-}
-
export function filterForNestedBundles(decodedHeader: DataItemHeader): boolean {
return containsAns104Tags(decodedHeader.tags);
}
diff --git a/tests/router.int.test.ts b/tests/router.int.test.ts
index 0e6d56b..71e4d45 100644
--- a/tests/router.int.test.ts
+++ b/tests/router.int.test.ts
@@ -463,10 +463,8 @@ describe("Router tests", function () {
stub(paymentService, "reserveBalanceForData").throws();
const { status, data } = await postStubDataItem(dataItem);
- expect(data).to.contain(
- "Upload Service is Unavailable. Payment Service is unreachable"
- );
- expect(status).to.equal(503);
+ expect(data).to.contain("Insufficient balance");
+ expect(status).to.equal(402);
});
it("with a data item signed by a non allow listed wallet without balance", async () => {
diff --git a/yarn.lock b/yarn.lock
index 6738ce6..3dab8bd 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -192,12 +192,12 @@ __metadata:
languageName: node
linkType: hard
-"@ardrive/ardrive-promise-cache@npm:^1.1.4":
- version: 1.1.4
- resolution: "@ardrive/ardrive-promise-cache@npm:1.1.4"
+"@ardrive/ardrive-promise-cache@npm:^1.3.0":
+ version: 1.3.0
+ resolution: "@ardrive/ardrive-promise-cache@npm:1.3.0"
dependencies:
"@alexsasharegan/simple-cache": ^3.3.3
- checksum: c92f3f745c2138b2d62c53d6a394900f1691dfe696106cf99d1ca3e71fc284c34cf2c84ceb5ad224ad19964397b2097a41bd9f54bc93464182c095e96b400cba
+ checksum: 69e0e0623ddf6d1d158d446c1da935b0962df36855e24d6ed480c37af11829ab0995d0a09bb6149f69f6dd88522a991907446c4345d1ea32bdb1add0ffa96e99
languageName: node
linkType: hard
@@ -6076,7 +6076,7 @@ __metadata:
version: 0.0.0-use.local
resolution: "ardrive-upload-service@workspace:."
dependencies:
- "@ardrive/ardrive-promise-cache": ^1.1.4
+ "@ardrive/ardrive-promise-cache": ^1.3.0
"@aws-sdk/abort-controller": ^3.303.0
"@aws-sdk/client-s3": ^3.367.0
"@aws-sdk/client-secrets-manager": ^3.363.0