Skip to content

Commit

Permalink
fix updateBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
prostgles committed Nov 30, 2024
1 parent 0e2d9c7 commit 12ccc3d
Show file tree
Hide file tree
Showing 22 changed files with 132 additions and 103 deletions.
5 changes: 3 additions & 2 deletions lib/Auth/AuthHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,12 @@ export class AuthHandler {
}

const userData = await this.getClientInfo(clientReq);
const { email } = this.opts?.expressConfig?.registrations ?? {};
const auth: AuthSocketSchema = {
providers: getProviders.bind(this)(),
register: this.opts?.expressConfig?.registrations?.email && { type: this.opts?.expressConfig?.registrations?.email.signupType, url: AUTH_ROUTES_AND_PARAMS.emailSignup },
register: email && { type: email.signupType, url: AUTH_ROUTES_AND_PARAMS.emailSignup },
user: userData?.clientUser,
loginType: this.opts?.expressConfig?.registrations?.email?.signupType ?? "withPassword",
loginType: email?.signupType ?? "withPassword",
pathGuard,
};
return { auth, userData };
Expand Down
2 changes: 1 addition & 1 deletion lib/DboBuilder/DboBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ export class DboBuilder {
}

runSQL = async (query: string, params: any, options: SQLOptions | undefined, localParams?: LocalParams) => {
return runSQL.bind(this)(query, params, options, localParams).catch(error => Promise.reject(getSerializedClientErrorFromPGError(error, { type: "sql" })));
return runSQL.bind(this)(query, params, options, localParams).catch(error => Promise.reject(getSerializedClientErrorFromPGError(error, { type: "sql", localParams })));
}

canSubscribe = false;
Expand Down
3 changes: 2 additions & 1 deletion lib/DboBuilder/DboBuilderTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import * as pgPromise from 'pg-promise';
import {
AnyObject,
ClientSchema,
ColumnInfo,
DbJoinMaker,
EXISTS_KEY,
Expand Down Expand Up @@ -189,7 +190,7 @@ export type PRGLIOSocket = {
_user?: AnyObject

/** Used for publish error caching */
prostgles?: AnyObject;
prostgles?: ClientSchema;
};

export type LocalParams = {
Expand Down
2 changes: 1 addition & 1 deletion lib/DboBuilder/QueryStreamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class QueryStreamer {
if(errored) return;
errored = true;

const errorWithoutQuery = getSerializedClientErrorFromPGError(rawError, { type: "sql" });
const errorWithoutQuery = getSerializedClientErrorFromPGError(rawError, { type: "sql", localParams: { socket } });
// For some reason query is not present on the error object from sql stream mode
const error = { ...errorWithoutQuery, query: query.query };
socket.emit(channel, { type: "error", error } satisfies SocketSQLStreamPacket);
Expand Down
2 changes: 1 addition & 1 deletion lib/DboBuilder/TableHandler/TableHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export class TableHandler extends ViewHandler {

async delete(filter?: Filter, params?: DeleteParams, param3_unused?: undefined, table_rules?: TableRule, localParams?: LocalParams): Promise<any> {
return _delete.bind(this)(filter, params, param3_unused, table_rules, localParams);
}
}

remove(filter: Filter, params?: UpdateParams, param3_unused?: undefined, tableRules?: TableRule, localParams?: LocalParams) {
return this.delete(filter, params, param3_unused, tableRules, localParams);
Expand Down
4 changes: 2 additions & 2 deletions lib/DboBuilder/TableHandler/updateBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Filter, LocalParams, getClientErrorFromPGError, getErrorAsObject, getSe
import { TableHandler } from "./TableHandler";


export async function updateBatch(this: TableHandler, updates: [Filter, AnyObject][], params?: UpdateParams, tableRules?: TableRule, localParams?: LocalParams): Promise<any> {
export async function updateBatch(this: TableHandler, updates: [Filter, AnyObject][], params?: UpdateParams, _?: undefined, tableRules?: TableRule, localParams?: LocalParams): Promise<any> {
const start = Date.now();
try {
const { checkFilter, postValidate } = tableRules?.update ?? {};
Expand Down Expand Up @@ -38,7 +38,7 @@ export async function updateBatch(this: TableHandler, updates: [Filter, AnyObjec
const result = await this.db.tx(t => {
return t.none(queries.join(";\n"));
})
.catch(err => getClientErrorFromPGError(err, { type: "tableMethod", localParams, view: this, allowedKeys: []}));
.catch(err => getClientErrorFromPGError(err, { type: "tableMethod", localParams, view: this, allowedKeys: [] }));

await this._log({ command: "updateBatch", localParams, data: { data: updates, params }, duration: Date.now() - start });
return result;
Expand Down
7 changes: 3 additions & 4 deletions lib/DboBuilder/TableHandler/upsert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ export const upsert = async function(this: TableHandler, filter: Filter, newData
/* Do it within a transaction to ensure consisency */
if (!this.tx) {
return this.dboBuilder.getTX(dbTX => _upsert(dbTX[this.name] as TableHandler))
} else {
const result = await _upsert(this);
await this._log({ command: "upsert", localParams, data: { filter, newData, params }, duration: Date.now() - start });
return result;
}
const result = await _upsert(this);
await this._log({ command: "upsert", localParams, data: { filter, newData, params }, duration: Date.now() - start });
return result;

} catch (e) {
await this._log({ command: "upsert", localParams, data: { filter, newData, params }, duration: Date.now() - start, error: getErrorAsObject(e) });
Expand Down
4 changes: 2 additions & 2 deletions lib/DboBuilder/ViewHandler/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type RunQueryReturnTypeArgs = {
export const runQueryReturnType = async ({ newQuery, handler, localParams, queryWithRLS, queryWithoutRLS, returnType,}: RunQueryReturnTypeArgs) => {

const query = queryWithRLS;
const sqlTypes = ["statement", "statement-no-rls", "statement-where"];
const sqlTypes = ["statement", "statement-no-rls", "statement-where"] as const;
if(!returnType || returnType === "values"){

return handler.dbHandler.any(query).then(data => {
Expand All @@ -134,7 +134,7 @@ export const runQueryReturnType = async ({ newQuery, handler, localParams, query

} else if (sqlTypes.some(v => v === returnType)) {
if (!(await canRunSQL(handler.dboBuilder.prostgles, localParams))) {
throw `Not allowed: {returnType: ${JSON.stringify(returnType)}} requires execute sql privileges `
throw `Not allowed: { returnType: ${JSON.stringify(returnType)} } requires execute sql privileges `
}
if(returnType === "statement-no-rls"){
return queryWithoutRLS as any;
Expand Down
26 changes: 15 additions & 11 deletions lib/DboBuilder/dboBuilderUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,22 @@ export const getErrorAsObject = (rawError: any, includeStack = false) => {

type GetSerializedClientErrorFromPGErrorArgs = {
type: "sql";
localParams: LocalParams | undefined;
} | {
type: "tableMethod";
localParams: LocalParams | undefined;
view: ViewHandler | Partial<TableHandler> | undefined;
allowedKeys?: string[];
canRunSql?: boolean;
} | {
type: "method";
localParams: LocalParams | undefined;
allowedKeys?: string[];
view?: undefined;
canRunSql?: boolean;
}
};

const sensitiveErrorKeys = ["hint", "detail", "context"] as const;
const otherKeys = ["column", "code", "code_info", "table", "constraint", "severity", "message", "name"] as const;

export function getSerializedClientErrorFromPGError(rawError: any, args: GetSerializedClientErrorFromPGErrorArgs): AnyObject {
const err = getErrorAsObject(rawError);
if(err.code) {
Expand All @@ -71,18 +74,19 @@ export function getSerializedClientErrorFromPGError(rawError: any, args: GetSeri
console.trace(err)
}

const isServerSideRequest = args.type !== "sql" && !args.localParams;
if(args.type === "sql" || isServerSideRequest){
const isServerSideRequest = !args.localParams;
//TODO: add a rawSQL check for HTTP requests
const showFullError = isServerSideRequest || args.type === "sql" || args.localParams?.socket?.prostgles?.rawSQL;
if(showFullError){
return err;
}
const { view, allowedKeys } = args;


const sensitiveErrorKeys = ["hint", "detail", "context"];
const otherKeys = ["column", "code", "code_info", "table", "constraint", "severity", "message", "name"];
const finalKeys = otherKeys
.concat(allowedKeys ?? [])
.concat(args.canRunSql? sensitiveErrorKeys : []);
const finalKeys = [
...otherKeys,
...(allowedKeys ?? []),
...(showFullError? sensitiveErrorKeys : [])
];

const errObject = pickKeys(err, finalKeys);
if (view?.dboBuilder?.constraints && errObject.constraint && !errObject.column) {
Expand Down
18 changes: 7 additions & 11 deletions lib/Prostgles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ export type PGP = pgPromise.IMain<{}, pg.IClient>;
import {
CHANNELS,
ClientSchema,
DBSchemaTable,
SQLRequest, TableSchemaForClient,
SQLRequest,
isObject, omitKeys, tryCatch
} from "prostgles-types";
import { DBEventsManager } from "./DBEventsManager";
import { PublishParser } from "./PublishParser/PublishParser";
export { sendEmail, getOrSetTransporter } from "./Auth/sendEmail";
export { getOrSetTransporter, sendEmail } from "./Auth/sendEmail";

export type DB = pgPromise.IDatabase<{}, pg.IClient>;
export type DBorTx = DB | pgPromise.ITask<{}>
Expand Down Expand Up @@ -62,7 +61,6 @@ const DEFAULT_KEYWORDS = {

import { randomUUID } from "crypto";
import * as fs from 'fs';
import { sendEmail } from "./Auth/sendEmail";

export class Prostgles {
/**
Expand Down Expand Up @@ -376,10 +374,7 @@ export class Prostgles {
if(!this.authHandler) throw "this.authHandler missing";
const userData = await this.authHandler.getClientInfo(clientInfo);
const { publishParser } = this;
let fullSchema: {
schema: TableSchemaForClient;
tables: DBSchemaTable[];
} | undefined;
let fullSchema: Awaited<ReturnType<PublishParser["getSchemaFromPublish"]>> | undefined;
let publishValidationError;

try {
Expand All @@ -395,7 +390,7 @@ export class Prostgles {
rawSQL = allowed;
}

const { schema, tables } = fullSchema ?? { schema: {}, tables: [] };
const { schema, tables, tableSchemaErrors } = fullSchema ?? { schema: {}, tables: [], tableSchemaErrors: {} };
const joinTables2: string[][] = [];
if (this.opts.joins) {
const _joinTables2 = this.dboBuilder.getAllJoinPaths()
Expand Down Expand Up @@ -433,13 +428,15 @@ export class Prostgles {
tableSchema: tables,
rawSQL,
joinTables: joinTables2,
tableSchemaErrors,
auth,
version,
err: publishValidationError? "Server Error: User publish validation failed." : undefined
};

return {
publishValidationError,
tableSchemaErrors,
clientSchema,
userData
}
Expand All @@ -460,8 +457,7 @@ export class Prostgles {

try {
const clientSchema = await this.getClientSchema({ socket });
socket.prostgles = socket.prostgles || {};
socket.prostgles.schema = clientSchema.schema;
socket.prostgles = clientSchema;
if (clientSchema.rawSQL) {
socket.removeAllListeners(CHANNELS.SQL)
socket.on(CHANNELS.SQL, async ({ query, params, options }: SQLRequest, cb = (..._callback: any) => { /* Empty */ }) => {
Expand Down
4 changes: 2 additions & 2 deletions lib/PublishParser/PublishParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ export class PublishParser {
if (!this.publish) throw "publish is missing";

/* Get any publish errors for socket */
const schm = localParams?.socket?.prostgles?.schema?.[tableName]?.[command];
const errorInfo = localParams?.socket?.prostgles?.tableSchemaErrors?.[tableName]?.[command];

if (schm && schm.err) throw schm.err;
if (errorInfo) throw errorInfo.error;

const table_rule = await this.getTableRules({ tableName, localParams }, clientInfo);
if (!table_rule) throw { stack: ["getValidatedRequestRule()"], message: "Invalid or disallowed table: " + tableName };
Expand Down
13 changes: 7 additions & 6 deletions lib/PublishParser/getSchemaFromPublish.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DBSchemaTable, MethodKey, TableInfo, TableSchemaForClient, getKeys, pickKeys } from "prostgles-types";
import { DBSchemaTable, MethodKey, TableInfo, TableSchemaErrors, TableSchemaForClient, getKeys, pickKeys } from "prostgles-types";
import { AuthResult, ExpressReq } from "../Auth/AuthTypes";
import { getErrorAsObject, PRGLIOSocket } from "../DboBuilder/DboBuilder";
import { PublishObject, PublishParser } from "./PublishParser"
Expand All @@ -13,8 +13,10 @@ type Args = ({
}) & {
userData: AuthResult | undefined;
}
export async function getSchemaFromPublish(this: PublishParser, { userData, ...clientReq }: Args): Promise<{ schema: TableSchemaForClient; tables: DBSchemaTable[] }> {

export async function getSchemaFromPublish(this: PublishParser, { userData, ...clientReq }: Args): Promise<{ schema: TableSchemaForClient; tables: DBSchemaTable[]; tableSchemaErrors: TableSchemaErrors }> {
const schema: TableSchemaForClient = {};
const tableSchemaErrors: TableSchemaErrors = {};
let tables: DBSchemaTable[] = []

try {
Expand Down Expand Up @@ -92,9 +94,8 @@ export async function getSchemaFromPublish(this: PublishParser, { userData, ...c
}

} catch (e) {
tableSchema[method] = {
err: "Internal publish error. Check server logs"
};
tableSchemaErrors[tableName] ??= {};
tableSchemaErrors[tableName]![method] = { error: "Internal publish error. Check server logs" };

throw {
...getErrorAsObject(e),
Expand Down Expand Up @@ -137,5 +138,5 @@ export async function getSchemaFromPublish(this: PublishParser, { userData, ...c
}

tables = tables.sort((a, b) => a.name.localeCompare(b.name));
return { schema, tables };
return { schema, tables, tableSchemaErrors };
}
2 changes: 1 addition & 1 deletion lib/RestApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class RestApi {
const data = await runClientSqlRequest.bind(this.prostgles)({ type: "http", httpReq: req, query, args, options });
res.json(data);
} catch(rawError){
const error = getSerializedClientErrorFromPGError(rawError, { type: "sql" });
const error = getSerializedClientErrorFromPGError(rawError, { type: "sql", localParams: { httpReq: req } });
res.status(400).json({ error });
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/SyncReplication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ export async function syncData (this: PubSubManager, sync: SyncParams, clientDat

updateData.push([syncSafeFilter, omitKeys(upd, id_fields)])
}));
await tbl.updateBatch(updateData, { fixIssues: true }, table_rules);
await tbl.updateBatch(updateData, { fixIssues: true }, undefined, table_rules);
} else {
updates = [];
}
Expand Down
Loading

0 comments on commit 12ccc3d

Please sign in to comment.