Skip to content

Commit

Permalink
feat(release): update aea353a
Browse files Browse the repository at this point in the history
  • Loading branch information
fedellen committed Jun 5, 2024
1 parent 8000a99 commit 1822e5a
Show file tree
Hide file tree
Showing 11 changed files with 70,402 additions and 21 deletions.
70,317 changes: 70,317 additions & 0 deletions plan_ids

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/arch/db/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import {
} from "../../types/types";
import { generateArrayChunks } from "../../utils/common";
import {
BundleAlreadySeededWarning,
BundlePlanExistsInAnotherStateWarning,
DataItemExistsWarning,
MultiPartUploadNotFound,
Expand Down Expand Up @@ -570,6 +571,14 @@ export class PostgresDatabase implements Database {
).where({ plan_id: planId });

if (postedBundleDbResult.length === 0) {
// check if its already seeded
const seededBundleDbResult = await this.writer<SeededBundleDBResult>(
tableNames.seededBundle
).where({ plan_id: planId });
if (seededBundleDbResult.length > 0) {
throw new BundleAlreadySeededWarning(seededBundleDbResult[0].bundle_id);
}

throw Error(`No posted_bundle found for plan id ${planId}!`);
}

Expand Down
13 changes: 9 additions & 4 deletions src/jobs/optical-post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,19 @@ export const opticalPostHandler = async ({
throw Error("OPTICAL_BRIDGE_URL is not set.");
}

const headers: Record<string, string> = {
"x-bundlr-public-key": opticalPubKey,
"Content-Type": "application/json",
};
if (process.env.AR_IO_ADMIN_KEY !== undefined) {
headers["Authorization"] = `Bearer ${process.env.AR_IO_ADMIN_KEY}`;
}

const axios = createAxiosInstance({
retries: 3,
config: {
validateStatus: () => true,
headers: {
"x-bundlr-public-key": opticalPubKey,
"Content-Type": "application/json",
},
headers,
},
});

Expand Down
5 changes: 4 additions & 1 deletion src/jobs/prepare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import defaultLogger from "../logger";
import { PlanId } from "../types/dbTypes";
import { JWKInterface } from "../types/jwkTypes";
import { W } from "../types/winston";
import { filterKeysFromObject } from "../utils/common";
import { filterKeysFromObject, sleep } from "../utils/common";
import { BundlePlanExistsInAnotherStateWarning } from "../utils/errors";
import { getArweaveWallet } from "../utils/getArweaveWallet";
import {
Expand Down Expand Up @@ -180,12 +180,15 @@ export async function prepareBundleHandler(
} catch (error) {
if (isNoSuchKeyS3Error(error)) {
const dataItemId = error.Key.split("/")[1];

// TODO: we need to add refund balance here
await database.updatePlannedDataItemAsFailed({
dataItemId,
failedReason: "missing_from_object_store",
});

// TODO: This is a hack -- recurse to retry the job without the deleted data item
await sleep(100); // Sleep to combat replication lag
return prepareBundleHandler(planId, {
database,
objectStore,
Expand Down
21 changes: 17 additions & 4 deletions src/jobs/seed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import { ObjectStore } from "../arch/objectStore";
import { createQueueHandler } from "../arch/queues";
import { ArweaveInterface } from "../arweaveJs";
import defaultLogger from "../logger";
import { PlanId } from "../types/dbTypes";
import { PlanId, PlannedDataItem, PostedBundle } from "../types/dbTypes";
import { filterKeysFromObject } from "../utils/common";
import { BundleAlreadySeededWarning } from "../utils/errors";
import {
getBundlePayload,
getBundleTx,
Expand All @@ -44,9 +45,21 @@ export async function seedBundleHandler(
}: SeedBundleJobInjectableArch,
logger = defaultLogger.child({ job: "seed-bundle-job", planId })
): Promise<void> {
const dbResult = await database.getNextBundleAndDataItemsToSeedByPlanId(
planId
);
let dbResult: {
bundleToSeed: PostedBundle;
dataItemsToSeed: PlannedDataItem[];
};

try {
dbResult = await database.getNextBundleAndDataItemsToSeedByPlanId(planId);
} catch (error) {
if (error instanceof BundleAlreadySeededWarning) {
logger.warn(error.message);
return;
}
throw error;
}

const { bundleToSeed, dataItemsToSeed } = dbResult;
const { bundleId, transactionByteCount } = bundleToSeed;

Expand Down
8 changes: 8 additions & 0 deletions src/metricRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ export class MetricRegistry {
help: "Count of data items that were found already in the local cache",
});

public static dataItemRemoveCanceledWhenFoundInDb = new promClient.Counter({
name: "data_item_remove_canceled_when_found_in_db_count",
help: "Count of data items that were not removed from object store because they were found in the database",
});

private constructor() {
this.registry = new promClient.Registry();

Expand All @@ -59,6 +64,9 @@ export class MetricRegistry {
this.registry.registerMetric(MetricRegistry.refundBalanceFail);
this.registry.registerMetric(MetricRegistry.usdToArRateFail);
this.registry.registerMetric(MetricRegistry.localCacheDataItemHit);
this.registry.registerMetric(
MetricRegistry.dataItemRemoveCanceledWhenFoundInDb
);
}

public static getInstance(): MetricRegistry {
Expand Down
21 changes: 13 additions & 8 deletions src/routes/dataItemPost.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,13 @@ export async function dataItemRoute(ctx: KoaContext, next: Next) {
};

const requestStartTime = Date.now();
const { objectStore, paymentService, arweaveGateway, getArweaveWallet } =
ctx.state;
const {
objectStore,
paymentService,
arweaveGateway,
getArweaveWallet,
database,
} = ctx.state;

// Validate the content-length header
const contentLength = ctx.req.headers?.["content-length"];
Expand Down Expand Up @@ -291,7 +296,7 @@ export async function dataItemRoute(ctx: KoaContext, next: Next) {
isValid = await streamingDataItem.isValid();
} catch (error) {
removeFromDataItemCache(dataItemId);
void removeDataItem(objectStore, dataItemId); // no need to await - just invoke and forget
void removeDataItem(objectStore, dataItemId, database); // no need to await - just invoke and forget
errorResponse(ctx, {
errorMessage: "Data item parsing error!",
error,
Expand All @@ -302,7 +307,7 @@ export async function dataItemRoute(ctx: KoaContext, next: Next) {
logger.debug(`Got data item validity.`, { isValid });
if (!isValid) {
removeFromDataItemCache(dataItemId);
void removeDataItem(objectStore, dataItemId);
void removeDataItem(objectStore, dataItemId, database);
errorResponse(ctx, {
errorMessage: "Invalid Data Item!",
});
Expand All @@ -315,7 +320,7 @@ export async function dataItemRoute(ctx: KoaContext, next: Next) {

if (totalSize > maxSingleDataItemByteCount) {
removeFromDataItemCache(dataItemId);
void removeDataItem(objectStore, dataItemId);
void removeDataItem(objectStore, dataItemId, database);
errorResponse(ctx, {
errorMessage: `Data item is too large, this service only accepts data items up to ${maxSingleDataItemByteCount} bytes!`,
});
Expand All @@ -332,7 +337,7 @@ export async function dataItemRoute(ctx: KoaContext, next: Next) {
});

removeFromDataItemCache(dataItemId);
void removeDataItem(objectStore, dataItemId); // don't need to await this - just invoke and move on
void removeDataItem(objectStore, dataItemId, database); // don't need to await this - just invoke and move on
return next();
}

Expand Down Expand Up @@ -481,7 +486,7 @@ export async function dataItemRoute(ctx: KoaContext, next: Next) {
});
}
removeFromDataItemCache(dataItemId);
void removeDataItem(objectStore, dataItemId); // don't need to await this - just invoke and move on
void removeDataItem(objectStore, dataItemId, database); // don't need to await this - just invoke and move on

errorResponse(ctx, {
status: 503,
Expand Down Expand Up @@ -531,7 +536,7 @@ export async function dataItemRoute(ctx: KoaContext, next: Next) {
}
// always remove from instance cache
removeFromDataItemCache(dataItemId);
await removeDataItem(objectStore, dataItemId);
await removeDataItem(objectStore, dataItemId, database);

errorResponse(ctx, {
status: 503,
Expand Down
4 changes: 2 additions & 2 deletions src/routes/multiPartUploads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ export async function finalizeMPUWithDataItemInfo({
});
} else {
fnLogger.error(`Failing multipart upload due to insufficient balance.`);
void removeDataItem(objectStore, dataItemInfo.dataItemId); // don't need to await this - just invoke and move on
void removeDataItem(objectStore, dataItemInfo.dataItemId, database); // don't need to await this - just invoke and move on
await database.failFinishedMultiPartUpload({
uploadId,
failedReason: "UNDERFUNDED",
Expand Down Expand Up @@ -1111,7 +1111,7 @@ export async function finalizeMPUWithDataItemInfo({
});
}
if (!dataItemExists) {
void removeDataItem(objectStore, dataItemInfo.dataItemId); // don't need to await this - just invoke and move on
void removeDataItem(objectStore, dataItemInfo.dataItemId, database); // don't need to await this - just invoke and move on
}
throw error;
}
Expand Down
7 changes: 7 additions & 0 deletions src/utils/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,10 @@ export class DataItemsStillPendingWarning extends Error {
this.name = "DataItemsStillPendingWarning";
}
}

export class BundleAlreadySeededWarning extends Error {
constructor(bundleId: string) {
super(`Bundle with ID "${bundleId}" has already been seeded!`);
this.name = "BundleAlreadySeededWarning";
}
}
2 changes: 1 addition & 1 deletion src/utils/getArweaveWallet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ export async function getOpticalPubKey(): Promise<Base64UrlString> {
}
throw new Error(`PubKey unexpectedly undefined or zero length!`);
} catch (error) {
logger.error(
logger.debug(
"Couldn't retrieve pubKey from SSM! Falling back to SecretsMgr",
error
);
Expand Down
16 changes: 15 additions & 1 deletion src/utils/objectStoreUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import Transaction from "arweave/node/lib/transaction";
import MultiStream from "multistream";
import { EventEmitter, PassThrough, Readable, once, pipeline } from "stream";

import { Database } from "../arch/db/database";
import { ObjectStore } from "../arch/objectStore";
import { S3ObjectStore } from "../arch/s3ObjectStore";
import "../bundles/assembleBundleHeader";
import { bundleHeaderInfoFromBuffer } from "../bundles/assembleBundleHeader";
import { signatureTypeInfo } from "../bundles/verifyDataItem";
import { octetStreamContentType } from "../constants";
import logger from "../logger";
import { MetricRegistry } from "../metricRegistry";
import { PlanId } from "../types/dbTypes";
import { ByteCount, TransactionId, UploadId } from "../types/types";
import { sleep } from "./common";
import { streamToBuffer } from "./streamToBuffer";

const dataItemPrefix = "raw-data-item";
Expand Down Expand Up @@ -457,8 +460,19 @@ export async function getBundlePayload(

export async function removeDataItem(
objectStore: ObjectStore,
dataItemTxId: string
dataItemTxId: string,
database: Database
): Promise<void> {
await sleep(100); // Sleep for 100ms to allow the database to catch up from any replication lag
const dataItemExistsInDb = await database.getDataItemInfo(dataItemTxId);
if (dataItemExistsInDb !== undefined) {
logger.warn(
`Data item ${dataItemTxId} is still referenced in the database. Skipping removal from object store.`
);
MetricRegistry.dataItemRemoveCanceledWhenFoundInDb.inc();
return;
}

return objectStore.removeObject(`${dataItemPrefix}/${dataItemTxId}`);
}

Expand Down

0 comments on commit 1822e5a

Please sign in to comment.