Skip to content

Commit

Permalink
add cansubscribe check
Browse files Browse the repository at this point in the history
  • Loading branch information
prostgles committed Sep 21, 2024
1 parent 470110d commit 6dffff1
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 171 deletions.
18 changes: 16 additions & 2 deletions lib/DboBuilder/DboBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import { getDBSchema } from "../DBSchemaBuilder";
import {
DB, Prostgles
} from "../Prostgles";
import { Join } from "../ProstglesTypes";
import { PubSubManager } from "../PubSubManager/PubSubManager";
import { getCreatePubSubManagerError } from "../PubSubManager/getCreatePubSubManagerError";
import {
DbTableInfo,
PublishParser
Expand All @@ -29,7 +31,6 @@ import { PGConstraint, getCanExecute, getConstraints, getSerializedClientErrorFr
import { getTablesForSchemaPostgresSQL } from "./getTablesForSchemaPostgresSQL";
import { prepareShortestJoinPaths } from "./prepareShortestJoinPaths";
import { cacheDBTypes, runSQL } from "./runSQL";
import { Join } from "../ProstglesTypes";

export * from "./DboBuilderTypes";
export * from "./dboBuilderUtils";
Expand Down Expand Up @@ -169,8 +170,21 @@ export class DboBuilder {
runSQL = async (query: string, params: any, options: SQLOptions | undefined, localParams?: LocalParams) => {
return runSQL.bind(this)(query, params, options, localParams).catch(error => Promise.reject(getSerializedClientErrorFromPGError(error, { type: "sql" })));
}
async build(): Promise<DBHandlerServer> {

canSubscribe = false;
async build(): Promise<DBHandlerServer> {
if(!this.canSubscribe){
const subscribeError = await getCreatePubSubManagerError(this);
if(subscribeError){
console.error(
"Could not initiate PubSubManager. Realtime data/Subscriptions will not work. Error: ",
subscribeError
);
this.canSubscribe = false;
} else {
this.canSubscribe = true;
}
}
const start = Date.now();
const tablesOrViewsReq = await getTablesForSchemaPostgresSQL(this, this.prostgles.opts.schema);
await this.prostgles.opts.onLog?.({
Expand Down
8 changes: 6 additions & 2 deletions lib/DboBuilder/TableHandler/TableHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ export class TableHandler extends ViewHandler {
const start = Date.now();
try {

if(!this.dboBuilder.canSubscribe){
throw "Cannot subscribe. PubSubManager not initiated";
}

if (!localParams) throw "Sync not allowed within the server code";
const { socket } = localParams;
if (!socket) throw "socket missing";
Expand All @@ -115,7 +119,6 @@ export class TableHandler extends ViewHandler {
const invalidParams = Object.keys(params || {}).filter(k => !ALLOWED_PARAMS.includes(k));
if (invalidParams.length) throw "Invalid or dissallowed params found: " + invalidParams.join(", ");


const { synced_field, allow_delete }: SyncRule = table_rules.sync;

if (!table_rules.sync.id_fields.length || !synced_field) {
Expand Down Expand Up @@ -154,7 +157,8 @@ export class TableHandler extends ViewHandler {
return pubSubManager.addSync({
table_info: this.tableOrViewInfo,
condition,
id_fields, synced_field,
id_fields,
synced_field,
allow_delete,
socket,
table_rules,
Expand Down
4 changes: 4 additions & 0 deletions lib/DboBuilder/ViewHandler/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ async function subscribe(this: ViewHandler, filter: Filter, params: SubscribePar
const start = Date.now();
try {

if(!this.dboBuilder.canSubscribe){
throw "Cannot subscribe. PubSubManager not initiated";
}

if (this.tx) {
throw "subscribe not allowed within transactions";
}
Expand Down
6 changes: 3 additions & 3 deletions lib/PubSubManager/PubSubManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
* Licensed under the MIT License. See LICENSE in the project root for license information.
*--------------------------------------------------------------------------------------------*/

import { DBHandlerServer, DboBuilder, PRGLIOSocket, TableInfo, TableOrViewInfo, getCanExecute } from "../DboBuilder/DboBuilder";
import * as crypto from "crypto";
import { DBHandlerServer, DboBuilder, PRGLIOSocket, TableInfo, TableOrViewInfo } from "../DboBuilder/DboBuilder";
import { PostgresNotifListenManager } from "../PostgresNotifListenManager";
import { DB, getIsSuperUser } from "../Prostgles";
import { addSync } from "./addSync";
import { initPubSubManager } from "./initPubSubManager";
import * as crypto from "crypto";

import * as Bluebird from "bluebird";
import * as pgPromise from 'pg-promise';
Expand All @@ -25,8 +25,8 @@ import { syncData } from "../SyncReplication";
import { addSub } from "./addSub";
import { DB_OBJ_NAMES } from "./getPubSubManagerInitQuery";
import { notifListener } from "./notifListener";
import { pushSubData } from "./pushSubData";
import { DELETE_DISCONNECTED_APPS_QUERY } from "./orphanTriggerCheck";
import { pushSubData } from "./pushSubData";

type PGP = pgPromise.IMain<{}, pg.IClient>;
const pgp: PGP = pgPromise({
Expand Down
52 changes: 52 additions & 0 deletions lib/PubSubManager/getCreatePubSubManagerError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { tryCatch } from "prostgles-types";
import { getPubSubManagerInitQuery } from "./getPubSubManagerInitQuery";
import { getCanExecute } from "../DboBuilder/dboBuilderUtils";
import { DboBuilder } from "../DboBuilder/DboBuilder";

export const getCreatePubSubManagerError = async (dboBuilder: DboBuilder): Promise<string | undefined> => {
const db = dboBuilder.db;
const canExecute = await getCanExecute(db)
if (!canExecute) return "Cannot run EXECUTE statements on this connection";

/** Check if prostgles schema exists */
const prglSchema = await db.any(`
SELECT *
FROM pg_catalog.pg_namespace
WHERE nspname = 'prostgles'
`);

const checkIfCanCreateProstglesSchema = () => tryCatch(async () => {
const allGood = await db.tx(async t => {
await t.none(`
CREATE SCHEMA IF NOT EXISTS prostgles;
ROLLBACK;
`);

return true;
});

return allGood;
});

if (!prglSchema.length) {
const canCreate = await checkIfCanCreateProstglesSchema();
if (!canCreate) {
const dbName = await db.one(`SELECT current_database()`);
const user = await db.one(`SELECT current_user`);
return `Not allowed to create prostgles schema. GRANT CREATE ON DATABASE ${dbName} TO ${user}`;
}
return undefined;
}

const initQuery = await tryCatch(async () => ({ query: await getPubSubManagerInitQuery.bind(dboBuilder)() }));
if(initQuery.hasError){
console.error(initQuery.error);
return "Could not get initQuery. Check logs";
}

if(!initQuery.query){
return undefined;
}

return undefined;
}
5 changes: 3 additions & 2 deletions lib/PubSubManager/getPubSubManagerInitQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { pgp } from "../DboBuilder/DboBuilderTypes";
import { asValue, NOTIF_CHANNEL, NOTIF_TYPE, PubSubManager } from "./PubSubManager";
const { version } = require("../../package.json");
import { getAppCheckQuery } from "./orphanTriggerCheck";
import { DboBuilder } from "../DboBuilder/DboBuilder";

export const DB_OBJ_NAMES = {
trigger_add_remove_func: "prostgles.trigger_add_remove_func",
Expand Down Expand Up @@ -626,9 +627,9 @@ COMMIT;
* Initialize the prostgles schema and functions needed for realtime data and schema changes
* undefined returned if the database contains the apropriate prostgles schema
*/
export const getPubSubManagerInitQuery = async function(this: PubSubManager): Promise<string | undefined> {
export const getPubSubManagerInitQuery = async function(this: DboBuilder): Promise<string | undefined> {

const initQuery = getInitQuery(this.dboBuilder.prostgles.opts.DEBUG_MODE);
const initQuery = getInitQuery(this.prostgles.opts.DEBUG_MODE);
const { schema_md5 = "none" } = await this.db.oneOrNone("SELECT md5($1) as schema_md5", [initQuery.trim()]);
const query = pgp.as.format(initQuery, { schema_md5, version });
const existingSchema = await this.db.any(PROSTGLES_SCHEMA_EXISTS_QUERY);
Expand Down
2 changes: 1 addition & 1 deletion lib/PubSubManager/initPubSubManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export async function initPubSubManager(this: PubSubManager): Promise<PubSubMana

try {

const initQuery = await getPubSubManagerInitQuery.bind(this)();
const initQuery = await getPubSubManagerInitQuery.bind(this.dboBuilder)();

/**
* High database activity might cause deadlocks.
Expand Down
4 changes: 4 additions & 0 deletions lib/PublishParser/getSchemaFromPublish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ export async function getSchemaFromPublish(this: PublishParser, { userData, ...c
methods = getKeys(table_rules) as any;
}

if(!this.prostgles.dboBuilder.canSubscribe){
methods = methods.filter(m => !["subscribe", "subscribeOne", "sync", "unsubscribe", "unsync"].includes(m));
}

await Promise.all(methods.filter(m => m !== "select" as any)
.map(async method => {
if (method === "sync" && table_rules[method]) {
Expand Down
Loading

0 comments on commit 6dffff1

Please sign in to comment.