Skip to content

Commit

Permalink
Executed post-hooks in series & the subscriptions synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
yvann committed Oct 9, 2024
1 parent 67ac4fe commit e68d93c
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 171 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"devDependencies": {
"jest": "30.0.0-alpha.6",
"prettier": "3.3.3",
"typescript": "5.6.2"
"typescript": "5.6.3"
},
"packageManager": "[email protected]"
}
8 changes: 4 additions & 4 deletions packages/graphql-platform-connector-mariadb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@
"@swc/core": "1.7.28",
"@swc/jest": "0.2.36",
"@tsconfig/node20": "20.1.4",
"@types/node": "20.16.10",
"@types/node": "20.16.11",
"@types/semver": "7.5.8",
"graphql": "16.9.0",
"jest": "30.0.0-alpha.6",
"publint": "0.2.11",
"typescript": "5.6.2"
"typescript": "5.6.3"
},
"dependencies": {
"@prismamedia/async-event-emitter": "^5.5.1",
"@prismamedia/async-event-emitter": "^6.0.0",
"@prismamedia/graphql-platform": "workspace:packages/graphql-platform",
"@prismamedia/graphql-platform-scalars": "workspace:packages/graphql-platform-scalars",
"@prismamedia/graphql-platform-utils": "workspace:packages/graphql-platform-utils",
"@prismamedia/memoize": "^5.0.3",
"dependency-graph": "^1.0.0",
"inflection": "^3.0.0",
"mariadb": "^3.3.2",
"remeda": "^2.14.0",
"remeda": "^2.15.0",
"semver": "^7.6.3",
"type-fest": "^4.26.1"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@
"@types/body-parser": "^1.19.5",
"@types/cors": "^2.8.17",
"@types/express": "^4.17.21",
"@types/node": "20.16.10",
"@types/node": "20.16.11",
"@types/ws": "^8.5.12",
"body-parser": "1.20.3",
"cors": "2.8.5",
"express": "4.21.0",
"express": "4.21.1",
"graphql": "16.9.0",
"graphql-ws": "5.16.0",
"jest": "30.0.0-alpha.6",
"publint": "0.2.11",
"typescript": "5.6.2",
"typescript": "5.6.3",
"ws": "8.18.0"
},
"dependencies": {
Expand Down
6 changes: 3 additions & 3 deletions packages/graphql-platform-scalars/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@
"@swc/core": "1.7.28",
"@swc/jest": "0.2.36",
"@tsconfig/node20": "20.1.4",
"@types/node": "20.16.10",
"@types/node": "20.16.11",
"graphql": "16.9.0",
"jest": "30.0.0-alpha.6",
"publint": "0.2.11",
"typescript": "5.6.2"
"typescript": "5.6.3"
},
"dependencies": {
"@prismamedia/graphql-platform-utils": "workspace:packages/graphql-platform-utils",
"@types/draft-js": "^0.11.18",
"remeda": "^2.14.0",
"remeda": "^2.15.0",
"type-fest": "^4.26.1"
},
"peerDependencies": {
Expand Down
6 changes: 3 additions & 3 deletions packages/graphql-platform-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
"@swc/jest": "0.2.36",
"@tsconfig/node20": "20.1.4",
"@types/indefinite": "2.3.4",
"@types/node": "20.16.10",
"@types/node": "20.16.11",
"graphql": "16.9.0",
"jest": "30.0.0-alpha.6",
"publint": "0.2.11",
"typescript": "5.6.2"
"typescript": "5.6.3"
},
"dependencies": {
"@prismamedia/memoize": "^5.0.3",
"indefinite": "^2.5.1",
"remeda": "^2.14.0",
"remeda": "^2.15.0",
"type-fest": "^4.26.1"
},
"peerDependencies": {
Expand Down
8 changes: 4 additions & 4 deletions packages/graphql-platform/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
"@swc/core": "1.7.28",
"@swc/jest": "0.2.36",
"@tsconfig/node20": "20.1.4",
"@types/node": "20.16.10",
"@types/node": "20.16.11",
"graphql": "16.9.0",
"jest": "30.0.0-alpha.6",
"publint": "0.2.11",
"typescript": "5.6.2"
"typescript": "5.6.3"
},
"dependencies": {
"@prismamedia/async-event-emitter": "^5.5.1",
"@prismamedia/async-event-emitter": "^6.0.0",
"@prismamedia/graphql-platform-scalars": "workspace:packages/graphql-platform-scalars",
"@prismamedia/graphql-platform-utils": "workspace:packages/graphql-platform-utils",
"@prismamedia/memoize": "^5.0.3",
Expand All @@ -50,7 +50,7 @@
"inflection": "^3.0.0",
"p-queue": "^8.0.1",
"p-retry": "^6.2.0",
"remeda": "^2.14.0",
"remeda": "^2.15.0",
"type-fest": "^4.26.1"
},
"peerDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/graphql-platform/src/broker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface BrokerInterface {
/**
* Notify the broker about the given, local, node-changes
*/
publish(changes: NodeChangeAggregation): Promisable<void>;
publish(nodeChanges: NodeChangeAggregation): Promisable<void>;

/**
* Do whatever is needed to initialize the subscription and to subscribe to the node-changes
Expand Down
14 changes: 11 additions & 3 deletions packages/graphql-platform/src/broker/in-memory.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { EventListener } from '@prismamedia/async-event-emitter';
import assert from 'node:assert/strict';
import type { BrokerInterface } from '../broker-interface.js';
import type { GraphQLPlatform } from '../index.js';
import type {
Expand All @@ -19,8 +20,12 @@ export class InMemoryBroker implements BrokerInterface {
this.#subscriptions = new Map();
}

public publish(changes: NodeChangeAggregation): void {
this.#subscriptions.forEach((queue) => queue.enqueue(changes));
public async publish(nodeChanges: NodeChangeAggregation): Promise<void> {
await Promise.all(
Array.from(this.#subscriptions.values()).map((queue) =>
queue.enqueue(nodeChanges),
),
);
}

public subscribe(
Expand All @@ -36,7 +41,10 @@ export class InMemoryBroker implements BrokerInterface {
subscription: ChangesSubscriptionStream,
listener: EventListener<InMemorySubscriptionEvents, 'idle'>,
): void {
this.#subscriptions.get(subscription)?.onIdle(listener);
const queue = this.#subscriptions.get(subscription);
assert(queue, `The subscription is not registered`);

queue.onIdle(listener);
}

public async waitForIdle(
Expand Down
32 changes: 29 additions & 3 deletions packages/graphql-platform/src/broker/in-memory/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,39 @@ export class InMemorySubscription
}

public async [Symbol.asyncDispose](): Promise<void> {
this.off();
this.#queue.clear();
this.broker.unsubscribe(this.subscription);
}

public enqueue(changes: NodeChangeAggregation): void {
this.#queue.push(changes);
this.emit('enqueued', changes);
public async enqueue(
nodeChanges: NodeChangeAggregation,
waitUntilProcessed: boolean = this.subscription.consumingNodeChanges,
): Promise<void> {
if (!this.subscription.isAffectedBy(nodeChanges)) {
return;
}

const processing = waitUntilProcessed
? new Promise<void>((resolve) => {
const off = this.on(
'dequeued',
(dequeued) => {
if (nodeChanges === dequeued) {
off();
resolve();
}
},
this.subscription.signal,
() => resolve(),
);
})
: undefined;

this.#queue.push(nodeChanges);
await this.emit('enqueued', nodeChanges);

return processing;
}

public async *[Symbol.asyncIterator](): AsyncIterator<
Expand Down
2 changes: 1 addition & 1 deletion packages/graphql-platform/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ export class GraphQLPlatform<
if (aggregation.size) {
await Promise.all([
this.emit('node-changes', aggregation),
this.broker?.publish(aggregation),
this.broker.publish(aggregation),
]);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,13 @@ export class CreateSomeMutation<
return [];
}

// As the "data" will be provided to the hooks, we freeze it
Object.freeze(args.data);

// Build the "creation" statements based on the provided "data" argument
const creations: NodeCreationStatement[] = [];

for (const [index, data] of args.data.entries()) {
const indexedPath =
args.data.length > 1 ? utils.addPath(path, index) : path;

// As the "data" will be provided to the hooks, we freeze it
Object.freeze(data);

// Resolve the edges' nested-actions into their value
const value = await this.node.creationInputType.resolveValue(
data,
Expand Down Expand Up @@ -120,58 +114,48 @@ export class CreateSomeMutation<
{ path },
);

const newSources = await Promise.all(
rawNewSources.map(async (rawNewSource, index) => {
const indexedPath =
rawNewSources.length > 1 ? utils.addPath(path, index) : path;
const changes: NodeCreation[] = [];

const change = new NodeCreation(
this.node,
context.request,
rawNewSource,
);
for (const [index, rawNewSource] of rawNewSources.entries()) {
const change = new NodeCreation(this.node, context.request, rawNewSource);
changes.push(change);

// Let's everybody know about this created node
context.track(change);
// Let's everybody know about this created node
context.track(change);

// The "data" has been frozen above
const data = args.data[index];
const data = args.data[index];

// Apply the reverse-edges' actions
await this.node.creationInputType.applyReverseEdgeActions(
change.newValue,
data,
context,
indexedPath,
);

// Apply the "postCreate"-hook, if any
try {
await this.node.postCreate({ context, data, change });
} catch (cause) {
throw new LifecycleHookError(
this.node,
LifecycleHookKind.POST_CREATE,
{ cause, path: indexedPath },
);
}
const indexedPath =
rawNewSources.length > 1 ? utils.addPath(path, index) : path;

return change.newValue;
}),
);
// Apply the reverse-edges' actions
await this.node.creationInputType.applyReverseEdgeActions(
change.newValue,
data,
context,
indexedPath,
);

// Apply the "postCreate"-hook, if any
try {
await this.node.postCreate({ context, data, change });
} catch (cause) {
throw new LifecycleHookError(this.node, LifecycleHookKind.POST_CREATE, {
cause,
path: indexedPath,
});
}
}

return args.selection.isPure()
? newSources.map((newSource) => args.selection.pickValue(newSource))
: this.node.getQueryByKey('get-some-in-order').internal(
context,
authorization,
{
where: newSources.map((newSource) =>
this.node.mainIdentifier.selection.pickValue(newSource),
),
selection: args.selection,
},
path,
);
? changes.map((change) => args.selection.pickValue(change.newValue))
: this.node
.getQueryByKey('get-some-in-order')
.internal(
context,
authorization,
{ where: changes.map(({ id }) => id), selection: args.selection },
path,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ export class DeleteManyMutation<
try {
await this.node.preDelete({
context,
id: Object.freeze(ids[index]),
current: Object.freeze(this.node.selection.pickValue(oldValue)),
id: ids[index],
current: this.node.selection.pickValue(oldValue),
});
} catch (cause) {
throw new LifecycleHookError(
Expand Down Expand Up @@ -241,26 +241,23 @@ export class DeleteManyMutation<
{ path },
);

return Promise.all(
oldValues.map(async (oldValue) => {
const change = new NodeDeletion(this.node, context.request, oldValue);
for (const oldValue of oldValues) {
const change = new NodeDeletion(this.node, context.request, oldValue);

// Let's everybody know about this deleted node
context.track(change);
// Let's everybody know about this deleted node
context.track(change);

// Apply the "postDelete"-hook, if any
try {
await this.node.postDelete({ context, change });
} catch (cause) {
throw new LifecycleHookError(
this.node,
LifecycleHookKind.POST_DELETE,
{ cause, path },
);
}
// Apply the "postDelete"-hook, if any
try {
await this.node.postDelete({ context, change });
} catch (cause) {
throw new LifecycleHookError(this.node, LifecycleHookKind.POST_DELETE, {
cause,
path,
});
}
}

return args.selection.pickValue(oldValue);
}),
);
return oldValues.map((oldValue) => args.selection.pickValue(oldValue));
}
}
Loading

0 comments on commit e68d93c

Please sign in to comment.