Skip to content

Commit

Permalink
[payment-endpoint] GitHub: Run reconcilePendingEvents and bookkeeping…
Browse files Browse the repository at this point in the history
… in same transaction
  • Loading branch information
geropl authored and roboquat committed Nov 26, 2021
1 parent 9e41eb0 commit 1ed5952
Showing 1 changed file with 27 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { inject, injectable } from "inversify";
import * as fs from 'fs';
import { log } from '@gitpod/gitpod-protocol/lib/util/logging';
import * as jwt from 'jsonwebtoken';
import { PendingGithubEventDB, UserDB } from "@gitpod/gitpod-db/lib";
import { PendingGithubEventDB, TransactionalPendingGithubEventDBFactory, UserDB } from "@gitpod/gitpod-db/lib";
import { GithubSubscriptionMapper, MarketplaceEventAll } from "./subscription-mapper";
import { User, Queue } from "@gitpod/gitpod-protocol";
import { UserPaidSubscription } from "@gitpod/gitpod-protocol/lib/accounting-protocol";
Expand All @@ -18,6 +18,7 @@ import { SubscriptionModel } from "../accounting/subscription-model";
import { Plans, Plan } from "@gitpod/gitpod-protocol/lib/plans";
import * as Webhooks from '@octokit/webhooks';
import fetch from "node-fetch";
import { EntityManager } from 'typeorm';

@injectable()
export class GithubSubscriptionReconciler {
Expand All @@ -26,6 +27,7 @@ export class GithubSubscriptionReconciler {
@inject(AccountingDB) protected readonly accountingDB: AccountingDB;
@inject(UserDB) protected readonly userDB: UserDB;
@inject(Config) protected readonly config: Config;
@inject(TransactionalPendingGithubEventDBFactory) protected readonly transactionalFactory: TransactionalPendingGithubEventDBFactory;
protected privateKey: string | undefined;
protected reconciliationTasks = new Queue();

Expand All @@ -34,9 +36,6 @@ export class GithubSubscriptionReconciler {
const authId: string = evt.payload.marketplace_purchase.account.id.toString();
const user = await this.userDB.findUserByIdentity({ authProviderId: "Public-GitHub", authId });
if (user) {
// This will become a performance bottleneck if we have too many operations coming in.
// The only reason we're enqueueing the event reconciliation here is because pollAndReconcilFromGithub does
// not run in a transaction. Maybe we should do that instead.
await this.reconciliationTasks.enqueue(() => this.reconcileEvent(user, evt));
} else {
log.error("Received GitHub marketplace purchase event for an unknown user. Storing for later use.", {evt});
Expand All @@ -55,8 +54,10 @@ export class GithubSubscriptionReconciler {
for (const evt of pendingPurchaseEvents) {
try {
const githubEvt = JSON.parse(evt.event);
await this.reconcileEvent(evt.identity.user, githubEvt);
await this.pendingEventsDB.delete(evt);
await this.reconcileEvent(evt.identity.user, githubEvt, async (em: EntityManager) => {
const pendingEventsDB = this.transactionalFactory(em);
await pendingEventsDB.delete(evt);
});
log.debug({userId: evt.identity.user.id}, "followed up on pending purchasing event", {evt});
} catch(err) {
log.debug("could not follow up on pending event", err);
Expand Down Expand Up @@ -169,11 +170,7 @@ export class GithubSubscriptionReconciler {

if (model) {
// we have some changes that we need to write to the database
const delta = model.getResult();
await Promise.all([
...delta.updates.map(s => this.accountingDB.storeSubscription(s)),
...delta.inserts.map(s => this.accountingDB.newSubscription(s))
]);
await this.persistModel(model);
}
}

Expand All @@ -196,11 +193,7 @@ export class GithubSubscriptionReconciler {
// We have an active subscription, but not for this plan. Let's change to new plan.
const model = new SubscriptionModel(sub.userId, [ sub ]);
this.subscriptionMapper.mapSubscriptionCancel(sub.userId, new Date().toISOString(), model);
const delta = model.getResult();
await Promise.all([
...delta.updates.map(s => this.accountingDB.storeSubscription(s)),
...delta.inserts.map(s => this.accountingDB.newSubscription(s))
]);
await this.persistModel(model);
}
}
}
Expand All @@ -219,27 +212,30 @@ export class GithubSubscriptionReconciler {
}), 24 * 60 * 1000); // once a day
}

public async reconcileEvent(user: User, evt: MarketplaceEventAll) {
public async reconcileEvent(user: User, evt: MarketplaceEventAll, runInTransaction?: (manager: EntityManager) => Promise<void>) {
const userId = user.id;
await this.accountingDB.transaction(async db => {
const subscriptions = await db.findAllSubscriptionsForUser(userId);
const userPaidSubscriptions = subscriptions.filter(s => UserPaidSubscription.is(s) && s.paymentReference.startsWith("github:"));

const model = new SubscriptionModel(userId, userPaidSubscriptions);
const success = await this.subscriptionMapper.map(evt, model);
if (!success) {
log.debug({userId}, "subscription mapper did not succeed for GitHub market purchase event. See errors above.", { evt });
return;
}
const subscriptions = await this.accountingDB.findAllSubscriptionsForUser(userId);
const userPaidSubscriptions = subscriptions.filter(s => UserPaidSubscription.is(s) && s.paymentReference.startsWith("github:"));

const model = new SubscriptionModel(userId, userPaidSubscriptions);
const success = await this.subscriptionMapper.map(evt, model);
if (!success) {
log.debug({userId}, "subscription mapper did not succeed for GitHub market purchase event. See errors above.", { evt });
return;
}
await this.persistModel(model, runInTransaction);
}

const delta = model.getResult();
async persistModel(model: SubscriptionModel, runInTransaction?: ((manager: EntityManager) => Promise<void>)) {
const closures = runInTransaction ? [runInTransaction] : [];
const delta = model.getResult();
await this.accountingDB.transaction(async db => {
await Promise.all([
...delta.updates.map(s => db.storeSubscription(s)),
...delta.inserts.map(s => db.newSubscription(s))
...delta.inserts.map(s => db.newSubscription(s)),
]);
});
}, closures);
}

}


Expand Down

0 comments on commit 1ed5952

Please sign in to comment.