Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataprep sqlx support #1891

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cli/api/utils/graphs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import { dataform } from "df/protos/ts";

export const combineAllActions = (graph: dataform.ICompiledGraph) => {
return ([] as Array<
dataform.ITable | dataform.IOperation | dataform.IAssertion | dataform.IDeclaration
dataform.ITable | dataform.IOperation | dataform.IAssertion | dataform.IDeclaration | dataform.IDataPreparation
>).concat(
graph.tables || ([] as dataform.ITable[]),
graph.operations || ([] as dataform.IOperation[]),
graph.assertions || ([] as dataform.IAssertion[]),
graph.declarations || ([] as dataform.IDeclaration[])
graph.declarations || ([] as dataform.IDeclaration[]),
graph.dataPreparations || ([] as dataform.IDataPreparation[])
);
};

Expand Down
268 changes: 233 additions & 35 deletions core/actions/data_preparation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,46 @@ import { dump as dumpYaml } from "js-yaml";

import { verifyObjectMatchesProto, VerifyProtoErrorBehaviour } from "df/common/protos";
import { ActionBuilder } from "df/core/actions";
import { Resolvable } from "df/core/common";
import { ITableContext } from "df/core/actions/index";
import {
Contextable,
Resolvable
} from "df/core/common";
import * as Path from "df/core/path";
import { Session } from "df/core/session";
import {
actionConfigToCompiledGraphTarget,
addDependenciesToActionDependencyTargets,
configTargetToCompiledGraphTarget,
nativeRequire,
resolveActionsConfigFilename
resolvableAsTarget,
resolveActionsConfigFilename,
setNameAndTarget,
toResolvable,
validateQueryString
} from "df/core/utils";
import { dataform } from "df/protos/ts";
import {filename} from 'df/core/path';

// Enum for Load configuration settings
export const LoadType = ["replace", "append", "maximum", "unique", "automatic"] as const;
export type LoadType = typeof LoadType[number];

// Properties for load configuration, including the column name for MAXIMUM and UNIQUE load configurations.
export interface ILoadConfig {
type: LoadType;
column?: string;
}


/**
* @hidden
*/
export class DataPreparation extends ActionBuilder<dataform.DataPreparation> {
public session: Session;

// We delay contextification until the final compile step, so hold these here for now.
public contextableQuery: Contextable<IDataPreparationContext, string>;

// TODO: make this field private, to enforce proto update logic to happen in this class.
public proto: dataform.IDataPreparation = dataform.DataPreparation.create();

Expand All @@ -46,43 +68,29 @@ export class DataPreparation extends ActionBuilder<dataform.DataPreparation> {
}
}

config.filename = resolveActionsConfigFilename(config.filename, configPath);
const dataPreparationAsJson = nativeRequire(config.filename).asJson;

// Find targets
const targets = this.getTargets(
dataPreparationAsJson as {
[key: string]: any;
}
);

// if there are targets in the data preparation, resolve and set targets.
// Otherwise, treat this as an empty data preparation.
if (targets.length > 0) {
this.resolveTargets(targets, dataPreparationAsJson, session, config);
} else {
// Handle the case where the data preparation is empty
this.resolveEmpty(dataPreparationAsJson, session, config);
}

this.proto.tags = config.tags;
if (config.dependencyTargets) {
this.dependencies(
config.dependencyTargets.map(dependencyTarget =>
configTargetToCompiledGraphTarget(dataform.ActionConfig.Target.create(dependencyTarget))
)
);
}
this.proto.fileName = config.filename;
if (config.disabled) {
this.proto.disabled = config.disabled;
const extension = Path.fileExtension(config.filename);
if (extension === "yaml") {
this.configureYaml(session, config, configPath);
} else if (extension === "sqlx") {
this.configureSqlx(session, config);
}
}

/**
* @hidden
*/
public config(config: any) {
if (config.database) {
this.database(config.database);
}
if (config.schema) {
this.schema(config.schema);
}
return this;
}

public query(query: Contextable<IDataPreparationContext, string>) {
this.contextableQuery = query;
return this;
}

Expand Down Expand Up @@ -113,14 +121,42 @@ export class DataPreparation extends ActionBuilder<dataform.DataPreparation> {
}

public compile() {
if (this.contextableQuery != null) {
const context = new DataPreparationContext(this);
this.proto.query = context.apply(this.contextableQuery).trim();
validateQueryString(this.session, this.proto.query, this.proto.fileName);
}

return verifyObjectMatchesProto(
dataform.DataPreparation,
this.proto,
VerifyProtoErrorBehaviour.SUGGEST_REPORTING_TO_DATAFORM_TEAM
);
}

private resolveEmpty(
public database(database: string) {
setNameAndTarget(
this.session,
this.proto,
this.proto.target.name,
this.proto.target.schema,
database
);
return this;
}

public schema(schema: string) {
setNameAndTarget(
this.session,
this.proto,
this.proto.target.name,
schema,
this.proto.target.database
);
return this;
}

private configureYamlWithoutTargets(
dataPreparationAsJson: {
[key: string]: any;
},
Expand All @@ -147,7 +183,7 @@ export class DataPreparation extends ActionBuilder<dataform.DataPreparation> {
this.proto.dataPreparationYaml = dumpYaml(resolvedDefinition);
}

private resolveTargets(
private configureYamlWithTargets(
targets: dataform.Target[],
dataPreparationAsJson: {
[key: string]: any;
Expand Down Expand Up @@ -265,4 +301,166 @@ export class DataPreparation extends ActionBuilder<dataform.DataPreparation> {

return targets;
}
private configureYaml(
session?: Session,
config?: dataform.ActionConfig.DataPreparationConfig,
configPath?: string) {
config.filename = resolveActionsConfigFilename(config.filename, configPath);
const dataPreparationAsJson = nativeRequire(config.filename).asJson;

// Find targets
const targets = this.getTargets(
dataPreparationAsJson as {
[key: string]: any;
}
);

// if there are targets in the data preparation, resolve and set targets.
// Otherwise, treat this as an empty data preparation.
if (targets.length > 0) {
this.configureYamlWithTargets(targets, dataPreparationAsJson, session, config);
} else {
// Handle the case where the data preparation is empty
this.configureYamlWithoutTargets(dataPreparationAsJson, session, config);
}

this.proto.tags = config.tags;
if (config.dependencyTargets) {
this.dependencies(
config.dependencyTargets.map(dependencyTarget =>
configTargetToCompiledGraphTarget(dataform.ActionConfig.Target.create(dependencyTarget))
)
);
}
this.proto.fileName = config.filename;
if (config.disabled) {
this.proto.disabled = config.disabled;
}
}

private configureSqlx(
session?: Session,
config?: dataform.ActionConfig.DataPreparationConfig) {
const targets: dataform.Target[] = [];

// Add destination as target
targets.push(actionConfigToCompiledGraphTarget(config));
// Add Error Table if specified as a secondary target
if (config.errorTable != null) {
const errorTableConfig = dataform.ActionConfig.DataPreparationConfig.ErrorTableConfig.create(config.errorTable);
const errorTableTarget = actionConfigToCompiledGraphTarget(errorTableConfig);

this.proto.errorTableRetentionDays = errorTableConfig.retentionDays
targets.push(errorTableTarget);
}

// Resolve targets
this.proto.targets = targets.map(target =>
this.applySessionToTarget(target, session.projectConfig, config.filename, true)
).map(target =>
this.finalizeTarget(target)
);

// Add target and error table to proto
this.proto.target = this.proto.targets[0];
if (this.proto.targets.length > 1) {
this.proto.errorTable = this.proto.targets[1];
}

// resolve canonical targets
this.proto.canonicalTargets = targets.map(target =>
this.applySessionToTarget(target, session.canonicalProjectConfig)
);
this.proto.canonicalTarget = this.proto.canonicalTargets[0];

if (config.dependencyTargets) {
this.dependencies(
config.dependencyTargets.map(dependencyTarget =>
configTargetToCompiledGraphTarget(dataform.ActionConfig.Target.create(dependencyTarget))
)
);
}

this.proto.fileName = config.filename
this.query(nativeRequire(config.filename).query);
kolina marked this conversation as resolved.
Show resolved Hide resolved
}
}

export interface IDataPreparationContext extends ITableContext {
/**
* Shorthand for a validation SQL expression. This converts the parameters
* into a validation call supported by Data Preparation.
*/
validate: (exp: string) => string;
}

export class DataPreparationContext implements IDataPreparationContext {
constructor(private dataPreparation: DataPreparation, private isIncremental = false) {}

public config(config: dataform.ActionConfig.DataPreparationConfig.ErrorTableConfig) {
this.dataPreparation.config(config);
return "";
}

public self(): string {
return this.resolve(this.dataPreparation.proto.target);
}

public name(): string {
return this.dataPreparation.session.finalizeName(this.dataPreparation.proto.target.name);
}

public ref(ref: Resolvable | string[], ...rest: string[]): string {
ref = toResolvable(ref, rest);
if (!resolvableAsTarget(ref)) {
this.dataPreparation.session.compileError(new Error(`Action name is not specified`));
return "";
}
this.dataPreparation.dependencies(ref);
return this.resolve(ref);
}

public resolve(ref: Resolvable | string[], ...rest: string[]) {
return this.dataPreparation.session.resolve(ref, ...rest);
}

public schema(): string {
return this.dataPreparation.session.finalizeSchema(this.dataPreparation.proto.target.schema);
}

public database(): string {
if (!this.dataPreparation.proto.target.database) {
this.dataPreparation.session.compileError(new Error(`Warehouse does not support multiple databases`));
return "";
}

return this.dataPreparation.session.finalizeDatabase(this.dataPreparation.proto.target.database);
}

// TODO: Add support for incremental conditions in compilation output
public when(cond: boolean, trueCase: string, falseCase: string = "") {
return cond ? trueCase : falseCase;
}

// TODO: Add support for incremental conditions in compilation output
public incremental() {
return !!this.isIncremental;
}

public dependencies(res: Resolvable) {
this.dataPreparation.dependencies(res);
return "";
}

public validate(exp: string): string {
return `-- @@VALIDATION\n|> WHERE IF(${exp},true,ERROR(\"Validation Failed\"))`;
}

public apply<T>(value: Contextable<IDataPreparationContext, T>): T {
if (typeof value === "function") {
return (value as any)(this);
} else {
return value;
}
}
}
3 changes: 2 additions & 1 deletion core/compilers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const CONTEXT_FUNCTIONS = [
"when",
"incremental",
"schema",
"database"
"database",
"validate"
]
.map(name => `const ${name} = ctx.${name} ? ctx.${name}.bind(ctx) : undefined;`)
.join("\n");
Expand Down
Loading