Skip to content

Commit

Permalink
chore: address final TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
cristiand391 committed Nov 6, 2024
1 parent 07c944a commit 3f72588
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 58 deletions.
77 changes: 47 additions & 30 deletions src/bulkIngest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import * as fs from 'node:fs';
import { platform } from 'node:os';
import { Flags, Ux } from '@salesforce/sf-plugins-core';
import { Flags, SfCommand, Ux } from '@salesforce/sf-plugins-core';
import { IngestJobV2, IngestJobV2FailedResults, JobInfoV2 } from '@jsforce/jsforce-node/lib/api/bulk2.js';
import { Connection, Messages, SfError } from '@salesforce/core';
import { Schema } from '@jsforce/jsforce-node';
Expand Down Expand Up @@ -55,31 +55,28 @@ export async function bulkIngest(opts: {
jsonEnabled: boolean;
verbose: boolean;
logFn: (message: string) => void;
warnFn: (message: SfCommand.Warning) => void;
}): Promise<BulkIngestInfo> {
const {
conn,
operation,
object,
lineEnding = platform() === 'win32' ? 'CRLF' : 'LF',
columnDelimiter,
file,
logFn,
} = opts;
const { conn, operation, object, lineEnding = platform() === 'win32' ? 'CRLF' : 'LF', file, logFn } = opts;

// validation
if (opts.externalId && opts.operation !== 'upsert') {
// TODO: update error msg
throw new SfError('yadayadayda');
throw new SfError('External ID is only required for `sf data upsert bulk.');
}

if (opts.verbose && !['delete', 'hardDelete', 'upsert'].includes(opts.operation)) {
// TODO: update error msg
throw new SfError('yadayadayda');
throw new SfError(
'Verbose mode is limited for `sf data delete/upsert bulk` for backwards-compat only and will be removed after March 2025.'
);
}

const timeout = opts.async ? Duration.minutes(0) : opts.wait ?? Duration.minutes(0);
const async = timeout.milliseconds === 0;

// CSV file for `delete/HardDelete` operations only have 1 column (ID), we set it to `COMMA` if not specified but any delimiter works.
const columnDelimiter =
opts.columnDelimiter ?? (['delete', 'hardDelete'].includes(operation) ? 'COMMA' : await detectDelimiter(file));

const baseUrl = ensureString(opts.conn.getAuthInfoFields().instanceUrl);

const stages = new BulkIngestStages({
Expand All @@ -97,8 +94,7 @@ export async function bulkIngest(opts: {
operation,
lineEnding,
externalIdFieldName: opts.externalId,
columnDelimiter:
columnDelimiter ?? (['delete', 'hardDelete'].includes(operation) ? 'COMMA' : await detectDelimiter(file)),
columnDelimiter,
}).catch((err) => {
stages.stop('failed');
throw err;
Expand All @@ -123,9 +119,7 @@ export async function bulkIngest(opts: {
operation,
lineEnding,
externalIdFieldName: opts.externalId,
// TODO: inline this at the top-level, CSV for deletes only have one column so we can't detect the delimiter
columnDelimiter:
columnDelimiter ?? (['delete', 'hardDelete'].includes(operation) ? 'COMMA' : await detectDelimiter(file)),
columnDelimiter,
}).catch((err) => {
stages.stop('failed');
throw err;
Expand All @@ -134,6 +128,20 @@ export async function bulkIngest(opts: {
stages.setupJobListeners(job);
stages.processingJob();

// always create a cache for `bulk upsert/delete` (even if not async).
// This keeps backwards-compat with the previous cache resolver that always returned
// a valid cache entry even if the ID didn't exist in it and allowed the scenario below:
//
// `sf data delete bulk --wait 10` -> sync operation (successful or not) never created a cache
// `sf data delete resume -i <job-id>` worked b/c the cache resolver returned the ID as a cache entry
// `sf data delete resume --use-most-recent` was never supported for sync runs.
if (['upsert', 'delete', 'hardDelete'].includes(operation)) {
opts.warnFn(
'Resuming a synchronous operation via `sf data upsert/delete resume` will not be supported after March 2025.'
);
await opts.cache.createCacheEntryForRequest(job.id, ensureString(conn.getUsername()), conn.getApiVersion());
}

try {
await job.poll(5000, timeout.milliseconds);

Expand All @@ -150,15 +158,19 @@ export async function bulkIngest(opts: {
printBulkErrors(records);
}
}
// TODO: deprecate this and point users to `data bulk results`
if (['delete', 'hardDelete', 'upsert'].includes(opts.operation)) {

if (['delete', 'hardDelete', 'upsert'].includes(opts.operation) && opts.jsonEnabled) {
opts.warnFn(
'Record failures will not be included in JSON output after March 2025, use `sf data bulk results` to get results instead.'
);
return {
jobId: jobInfo.id,
processedRecords: jobInfo.numberRecordsProcessed,
successfulRecords: jobInfo.numberRecordsProcessed - (jobInfo.numberRecordsFailed ?? 0),
failedRecords: jobInfo.numberRecordsFailed,
};
}

throw messages.createError(
'error.failedRecordDetails',
[jobInfo.numberRecordsFailed],
Expand All @@ -170,15 +182,6 @@ export async function bulkIngest(opts: {

stages.stop();

// always create a cache for `bulk upsert/delete` (even if not async).
// This keeps backwards-compat with the previous cache resolver that always returned
// a valid cache entry even if the ID didn't exist in it.
//
// TODO: talk with Vivek if we can deprecate this odd behavior.
if (['upsert', 'delete', 'hardDelete'].includes(operation)) {
await opts.cache.createCacheEntryForRequest(job.id, ensureString(conn.getUsername()), conn.getApiVersion());
}

return {
jobId: jobInfo.id,
processedRecords: jobInfo.numberRecordsProcessed,
Expand Down Expand Up @@ -226,6 +229,7 @@ export async function bulkIngestResume(opts: {
jobIdOrMostRecent: string | boolean;
jsonEnabled: boolean;
wait: Duration;
warnFn: (message: SfCommand.Warning) => void;
}): Promise<BulkIngestInfo> {
const resumeOpts = await opts.cache.resolveResumeOptionsFromCache(opts.jobIdOrMostRecent);

Expand Down Expand Up @@ -256,6 +260,19 @@ export async function bulkIngestResume(opts: {

if (jobInfo.numberRecordsFailed) {
stages.error();

if (['delete', 'hardDelete', 'upsert'].includes(jobInfo.operation) && opts.jsonEnabled) {
opts.warnFn(
'Record failures will not be included in JSON output after March 2025, use `sf data bulk results` to get results instead.'
);
return {
jobId: jobInfo.id,
processedRecords: jobInfo.numberRecordsProcessed,
successfulRecords: jobInfo.numberRecordsProcessed - (jobInfo.numberRecordsFailed ?? 0),
failedRecords: jobInfo.numberRecordsFailed,
};
}

throw messages.createError(
'error.failedRecordDetails',
[jobInfo.numberRecordsFailed],
Expand Down
9 changes: 6 additions & 3 deletions src/commands/data/delete/bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import { Messages } from '@salesforce/core';
import { Flags, SfCommand } from '@salesforce/sf-plugins-core';
import { baseUpsertDeleteFlags, bulkIngest, columnDelimiterFlag, lineEndingFlag } from '../../../bulkIngest.js';
import { baseUpsertDeleteFlags, columnDelimiterFlag, lineEndingFlag, bulkIngest } from '../../../bulkIngest.js';
import { BulkDeleteRequestCache } from '../../../bulkDataRequestCache.js';
import { BulkResultV2 } from '../../../types.js';
import { transformResults } from '../../../bulkUtils.js';
Expand Down Expand Up @@ -48,8 +48,11 @@ export default class Delete extends SfCommand<BulkResultV2> {
file: flags.file,
jsonEnabled: this.jsonEnabled(),
verbose: flags.verbose,
logFn: (...args) => {
this.log(...args);
logFn: (arg: string) => {
this.log(arg);
},
warnFn: (arg: SfCommand.Warning) => {
this.warn(arg);
},
});

Expand Down
5 changes: 4 additions & 1 deletion src/commands/data/delete/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { Messages } from '@salesforce/core';
import { SfCommand } from '@salesforce/sf-plugins-core';
import type { BulkResultV2 } from '../../../types.js';
import { BulkDeleteRequestCache } from '../../../bulkDataRequestCache.js';
import { ResumeBulkCommand } from '../../../resumeBulkBaseCommand.js';
Expand All @@ -26,12 +27,14 @@ export default class DeleteResume extends ResumeBulkCommand {

const res = await bulkIngestResume({
cmdId: 'data delete resume',
// TODO: should be `Deleting` or `HardDeleting`
stageTitle: 'Deleting data',
cache,
jobIdOrMostRecent: flags['job-id'] ?? flags['use-most-recent'],
jsonEnabled: this.jsonEnabled(),
wait: flags.wait,
warnFn: (arg: SfCommand.Warning) => {
this.warn(arg);
},
});

const {
Expand Down
7 changes: 5 additions & 2 deletions src/commands/data/import/bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ export default class DataImportBulk extends SfCommand<DataImportBulkResult> {
file: flags.file,
jsonEnabled: this.jsonEnabled(),
verbose: false,
logFn: (...args) => {
this.log(...args);
logFn: (arg: string) => {
this.log(arg);
},
warnFn: (arg: SfCommand.Warning) => {
this.warn(arg);
},
});
}
Expand Down
3 changes: 3 additions & 0 deletions src/commands/data/import/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ export default class DataImportResume extends SfCommand<DataImportResumeResult>
jobIdOrMostRecent: flags['job-id'] ?? flags['use-most-recent'],
jsonEnabled: this.jsonEnabled(),
wait: flags.wait,
warnFn: (arg: SfCommand.Warning) => {
this.warn(arg);
},
});
}
}
23 changes: 5 additions & 18 deletions src/commands/data/query/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ export class BulkQueryReport extends SfCommand<unknown> {
const resumeOptions = await cache.resolveResumeOptionsFromCache(flags['bulk-query-id'] ?? flags['use-most-recent']);
const job = new QueryJobV2(resumeOptions.options.connection, {
id: resumeOptions.jobInfo.id,
pollingOptions: getNonZeroTimeoutPollingOptions({
pollTimeout: 0,
pollInterval: 0,
}),
pollingOptions: {
pollTimeout: 30_000,
pollInterval: 1000,
},
});
await job.poll();
const results = await job.result();
// TODO: I think `resumeOptions.options.query` used to be hardcoded to `query`, maybe remove it?
// we don't have access to the SOQL query here so we pass an empty string.
const queryResult = transformBulkResults((await results.toArray()) as jsforceRecord[], '');

if (!this.jsonEnabled()) {
Expand All @@ -73,16 +73,3 @@ export class BulkQueryReport extends SfCommand<unknown> {
return queryResult.result;
}
}

// TODO: I'm pretty sure we never saved `--wait` value in cache, see if this still makes sense after cache refactor
/**
* polling options are retrieved from the cache.
* If the data:query used `--async` or `--wait` 0, we'd be passing that to the jsforce poll method,
* which means it would never check the actual result, and always throw a timeout error */
const getNonZeroTimeoutPollingOptions = (pollingOptions: {
pollInterval: number;
pollTimeout: number;
}): { pollInterval: number; pollTimeout: number } => ({
...pollingOptions,
pollTimeout: Math.max(pollingOptions.pollTimeout, 1000),
});
7 changes: 5 additions & 2 deletions src/commands/data/update/bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ export default class DataUpdateBulk extends SfCommand<DataUpdateBulkResult> {
file: flags.file,
jsonEnabled: this.jsonEnabled(),
verbose: false,
logFn: (...args) => {
this.log(...args);
logFn: (arg: string) => {
this.log(arg);
},
warnFn: (arg: SfCommand.Warning) => {
this.warn(arg);
},
});
}
Expand Down
3 changes: 3 additions & 0 deletions src/commands/data/update/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ export default class DataUpdateResume extends SfCommand<DataUpdateResumeResult>
jobIdOrMostRecent: flags['job-id'] ?? flags['use-most-recent'],
jsonEnabled: this.jsonEnabled(),
wait: flags.wait,
warnFn: (arg: SfCommand.Warning) => {
this.warn(arg);
},
});
}
}
7 changes: 5 additions & 2 deletions src/commands/data/upsert/bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ export default class Upsert extends SfCommand<BulkResultV2> {
file: flags.file,
jsonEnabled: this.jsonEnabled(),
verbose: flags.verbose,
logFn: (...args) => {
this.log(...args);
logFn: (arg: string) => {
this.log(arg);
},
warnFn: (arg: SfCommand.Warning) => {
this.warn(arg);
},
});

Expand Down
4 changes: 4 additions & 0 deletions src/commands/data/upsert/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { Messages } from '@salesforce/core';
import { SfCommand } from '@salesforce/sf-plugins-core';
import type { BulkResultV2 } from '../../../types.js';
import { BulkUpsertRequestCache } from '../../../bulkDataRequestCache.js';
import { ResumeBulkCommand } from '../../../resumeBulkBaseCommand.js';
Expand All @@ -30,6 +31,9 @@ export default class UpsertResume extends ResumeBulkCommand {
jobIdOrMostRecent: flags['job-id'] ?? flags['use-most-recent'],
jsonEnabled: this.jsonEnabled(),
wait: flags.wait,
warnFn: (arg: SfCommand.Warning) => {
this.warn(arg);
},
});

const {
Expand Down

0 comments on commit 3f72588

Please sign in to comment.