Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: #60 add SalesforceDataTransferApi option to source #65

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions API.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 2 additions & 14 deletions src/salesforce/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SPDX-License-Identifier: Apache-2.0
import { CfnFlow } from 'aws-cdk-lib/aws-appflow';
import { IConstruct } from 'constructs';
import { SalesforceConnectorProfile } from './profile';
import { SalesforceDataTransferApi } from './salesforce-data-transfer-api';
import { SalesforceConnectorType } from './type';
import { AppFlowPermissionsManager } from '../core/appflow-permissions-manager';
import { ConnectorType } from '../core/connectors/connector-type';
Expand All @@ -13,19 +14,6 @@ import { IFlow } from '../core/flows';
import { IDestination } from '../core/vertices/destination';
import { WriteOperation } from '../core/write-operation';

/**
* The default. Amazon AppFlow selects which API to use based on the number of records that your flow transfers to Salesforce. If your flow transfers fewer than 1,000 records, Amazon AppFlow uses Salesforce REST API. If your flow transfers 1,000 records or more, Amazon AppFlow uses Salesforce Bulk API 2.0.
*
* Each of these Salesforce APIs structures data differently. If Amazon AppFlow selects the API automatically, be aware that, for recurring flows, the data output might vary from one flow run to the next. For example, if a flow runs daily, it might use REST API on one day to transfer 900 records, and it might use Bulk API 2.0 on the next day to transfer 1,100 records. For each of these flow runs, the respective Salesforce API formats the data differently. Some of the differences include how dates are formatted and null values are represented. Also, Bulk API 2.0 doesn't transfer Salesforce compound fields.
*
* By choosing this option, you optimize flow performance for both small and large data transfers, but the tradeoff is inconsistent formatting in the output.
*/
export enum SalesforceDataTransferApi {

AUTOMATIC = 'AUTOMATIC',
BULKV2 = 'BULKV2',
REST_SYNC = 'REST_SYNC'
}

export interface SalesforceDestinationProps {

Expand Down Expand Up @@ -90,4 +78,4 @@ export class SalesforceDestination implements IDestination {
}
}

}
}
3 changes: 2 additions & 1 deletion src/salesforce/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ SPDX-License-Identifier: Apache-2.0
export * from './type';
export * from './profile';
export * from './source';
export * from './destination';
export * from './destination';
export * from './salesforce-data-transfer-api';
13 changes: 13 additions & 0 deletions src/salesforce/salesforce-data-transfer-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/**
* The default. Amazon AppFlow selects which API to use based on the number of records that your flow transfers to Salesforce. If your flow transfers fewer than 1,000 records, Amazon AppFlow uses Salesforce REST API. If your flow transfers 1,000 records or more, Amazon AppFlow uses Salesforce Bulk API 2.0.
*
* Each of these Salesforce APIs structures data differently. If Amazon AppFlow selects the API automatically, be aware that, for recurring flows, the data output might vary from one flow run to the next. For example, if a flow runs daily, it might use REST API on one day to transfer 900 records, and it might use Bulk API 2.0 on the next day to transfer 1,100 records. For each of these flow runs, the respective Salesforce API formats the data differently. Some of the differences include how dates are formatted and null values are represented. Also, Bulk API 2.0 doesn't transfer Salesforce compound fields.
*
* By choosing this option, you optimize flow performance for both small and large data transfers, but the tradeoff is inconsistent formatting in the output.
*/
export enum SalesforceDataTransferApi {

AUTOMATIC = 'AUTOMATIC',
BULKV2 = 'BULKV2',
REST_SYNC = 'REST_SYNC'
}
10 changes: 9 additions & 1 deletion src/salesforce/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ SPDX-License-Identifier: Apache-2.0
import { CfnFlow } from 'aws-cdk-lib/aws-appflow';
import { IConstruct } from 'constructs';
import { SalesforceConnectorProfile } from './profile';
import { SalesforceDataTransferApi } from './salesforce-data-transfer-api';
import { SalesforceConnectorType } from './type';
import { ConnectorType } from '../core/connectors/connector-type';
import { IFlow } from '../core/flows';
import { ISource } from '../core/vertices/source';

export interface SalesforceSourceProps {
readonly profile: SalesforceConnectorProfile;

/**
* Specifies which Salesforce API is used by Amazon AppFlow when your flow transfers data from Salesforce.
*/
readonly dataTransferApi?: SalesforceDataTransferApi;

readonly object: string;
readonly apiVersion?: string;
readonly enableDynamicFieldUpdate?: boolean;
Expand Down Expand Up @@ -39,6 +46,7 @@ export class SalesforceSource implements ISource {
private buildSourceConnectorProperties(): CfnFlow.SourceConnectorPropertiesProperty {
return {
salesforce: {
dataTransferApi: this.props.dataTransferApi,
enableDynamicFieldUpdate: this.props.enableDynamicFieldUpdate,
includeDeletedRecords: this.props.includeDeletedRecords,
object: this.props.object,
Expand All @@ -51,4 +59,4 @@ export class SalesforceSource implements ISource {
scope.node.addDependency(resource);
}
}
}
}
243 changes: 243 additions & 0 deletions test/salesforce/destination.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
import { Stack } from 'aws-cdk-lib';
import { Template } from 'aws-cdk-lib/assertions';
import { Bucket } from 'aws-cdk-lib/aws-s3';

import { Secret } from 'aws-cdk-lib/aws-secretsmanager';
import {
Mapping,
OnDemandFlow,
S3InputFileType,
S3Source,
SalesforceConnectorProfile,
SalesforceConnectorType,
SalesforceDataTransferApi,
SalesforceDestination,
WriteOperation,
} from '../../src';

describe('SalesforceDestination', () => {

test('Destination with only connector name', () => {
const stack = new Stack(undefined, 'TestStack');
const destination = new SalesforceDestination({
profile: SalesforceConnectorProfile.fromConnectionProfileName(stack, 'TestProfile', 'dummy-profile'),
object: 'Account',
operation: WriteOperation.insert(),
});

const expectedConnectorType = SalesforceConnectorType.instance;
expect(destination.connectorType.asProfileConnectorLabel).toEqual(expectedConnectorType.asProfileConnectorLabel);
expect(destination.connectorType.asProfileConnectorType).toEqual(expectedConnectorType.asProfileConnectorType);
expect(destination.connectorType.asTaskConnectorOperatorOrigin).toEqual(expectedConnectorType.asTaskConnectorOperatorOrigin);
expect(destination.connectorType.isCustom).toEqual(expectedConnectorType.isCustom);
});

test('Destination in a Flow is in the stack', () => {
const stack = new Stack(undefined, 'TestStack');

const s3Bucket = new Bucket(stack, 'TestBucket', {});
const source = new S3Source({
bucket: s3Bucket,
prefix: '',
format: {
type: S3InputFileType.JSON,
},
});

const destination = new SalesforceDestination({
profile: SalesforceConnectorProfile.fromConnectionProfileName(stack, 'TestProfile', 'dummy-profile'),
dataTransferApi: SalesforceDataTransferApi.REST_SYNC,
object: 'Account',
operation: WriteOperation.insert(),
});

new OnDemandFlow(stack, 'TestFlow', {
source: source,
destination: destination,
mappings: [Mapping.mapAll()],
});

Template.fromStack(stack).hasResourceProperties('AWS::AppFlow::Flow', {
DestinationFlowConfigList: [
{
ConnectorProfileName: 'dummy-profile',
ConnectorType: 'Salesforce',
DestinationConnectorProperties: {
Salesforce: {
DataTransferApi: 'REST_SYNC',
Object: 'Account',
WriteOperationType: 'INSERT',
},
},
},
],
FlowName: 'TestFlow',
SourceFlowConfig: {
ConnectorType: 'S3',
SourceConnectorProperties: {
S3: {
BucketName: {
Ref: 'TestBucket560B80BC',
},
BucketPrefix: '',
S3InputFormatConfig: {
S3InputFileType: 'JSON',
},
},
},
},
Tasks: [
{
ConnectorOperator: {
S3: 'NO_OP',
},
SourceFields: [],
TaskProperties: [
{
Key: 'EXCLUDE_SOURCE_FIELDS_LIST',
Value: '[]',
},
],
TaskType: 'Map_all',
},
],
TriggerConfig: {
TriggerType: 'OnDemand',
},
});
});

test('Destination for dummy-profile in a Flow is in the stack', () => {
const stack = new Stack(undefined, 'TestStack');

const secret = Secret.fromSecretNameV2(stack, 'TestSecret', 'appflow/salesforce/client');
const profile = new SalesforceConnectorProfile(stack, 'TestProfile', {
oAuth: {
accessToken: 'accessToken',
flow: {
refreshTokenGrant: {
refreshToken: 'refreshToken',
client: secret,
},
},
},
instanceUrl: 'https://instance-id.develop.my.salesforce.com',
});


const s3Bucket = new Bucket(stack, 'TestBucket', {});
const source = new S3Source({
bucket: s3Bucket,
prefix: '',
format: {
type: S3InputFileType.JSON,
},
});

const destination = new SalesforceDestination({
profile: profile,
dataTransferApi: SalesforceDataTransferApi.REST_SYNC,
object: 'Account',
operation: WriteOperation.insert(),
});

new OnDemandFlow(stack, 'TestFlow', {
source: source,
destination: destination,
mappings: [Mapping.mapAll()],
});

const template = Template.fromStack(stack);
template.hasResourceProperties('AWS::AppFlow::ConnectorProfile', {
ConnectionMode: 'Public',
ConnectorProfileConfig: {
ConnectorProfileCredentials: {
Salesforce: {
AccessToken: 'accessToken',
ClientCredentialsArn: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':secretsmanager:',
{
Ref: 'AWS::Region',
},
':',
{
Ref: 'AWS::AccountId',
},
':secret:appflow/salesforce/client',
],
],
},
RefreshToken: 'refreshToken',
},
},
ConnectorProfileProperties: {
Salesforce: {
InstanceUrl: 'https://instance-id.develop.my.salesforce.com',
},
},
},
ConnectorProfileName: 'TestProfile',
ConnectorType: 'Salesforce',
});

template.hasResourceProperties('AWS::AppFlow::Flow', {
DestinationFlowConfigList: [
{
ConnectorProfileName: 'TestProfile',
ConnectorType: 'Salesforce',
DestinationConnectorProperties: {
Salesforce: {
DataTransferApi: 'REST_SYNC',
Object: 'Account',
WriteOperationType: 'INSERT',
},
},
},
],
FlowName: 'TestFlow',
SourceFlowConfig: {
ConnectorType: 'S3',
SourceConnectorProperties: {
S3: {
BucketName: {
Ref: 'TestBucket560B80BC',
},
BucketPrefix: '',
S3InputFormatConfig: {
S3InputFileType: 'JSON',
},
},
},
},
Tasks: [
{
ConnectorOperator: {
S3: 'NO_OP',
},
SourceFields: [],
TaskProperties: [
{
Key: 'EXCLUDE_SOURCE_FIELDS_LIST',
Value: '[]',
},
],
TaskType: 'Map_all',
},
],
TriggerConfig: {
TriggerType: 'OnDemand',
},
});
});
});
Loading