From 04a6032cbbfed4aedbab0bc9459efba8df68f2d6 Mon Sep 17 00:00:00 2001 From: Gero Posmyk-Leinemann Date: Mon, 13 Nov 2023 13:42:01 +0000 Subject: [PATCH] [server] Introduce RequestContext --- components/gitpod-protocol/src/analytics.ts | 4 +- .../gitpod-protocol/src/messaging/error.ts | 3 + .../gitpod-protocol/src/util/logging.ts | 1 + components/server/src/analytics.ts | 31 +++- .../src/api/auth-provider-service-api.ts | 51 +++--- .../src/api/configuration-service-api.ts | 25 ++- .../src/api/handler-context-augmentation.d.ts | 13 -- .../server/src/api/hello-service-api.ts | 20 ++- .../src/api/organization-service-api.ts | 69 ++++---- components/server/src/api/server.ts | 68 ++++---- .../server/src/api/workspace-service-api.ts | 11 +- components/server/src/auth/subject-id.ts | 76 +++++++++ .../caching-spicedb-authorizer.spec.db.ts | 77 ++++++--- .../src/authorization/spicedb-authorizer.ts | 23 +-- components/server/src/container-module.ts | 8 +- components/server/src/jobs/runner.ts | 12 +- .../server/src/messaging/redis-subscriber.ts | 12 +- components/server/src/prebuilds/github-app.ts | 28 ++- .../src/prebuilds/github-enterprise-app.ts | 6 +- components/server/src/server.ts | 23 ++- components/server/src/util/log-context.ts | 29 ++-- components/server/src/util/request-context.ts | 160 +++++++++++++++--- .../websocket/websocket-connection-manager.ts | 26 ++- .../src/workspace/gitpod-server-impl.ts | 9 +- .../server/src/workspace/workspace-service.ts | 3 +- .../server/src/workspace/workspace-starter.ts | 6 +- 26 files changed, 569 insertions(+), 225 deletions(-) delete mode 100644 components/server/src/api/handler-context-augmentation.d.ts create mode 100644 components/server/src/auth/subject-id.ts diff --git a/components/gitpod-protocol/src/analytics.ts b/components/gitpod-protocol/src/analytics.ts index 89cf67038f8814..12a6b9fc48426d 100644 --- a/components/gitpod-protocol/src/analytics.ts +++ b/components/gitpod-protocol/src/analytics.ts @@ -6,9 +6,7 @@ export const IAnalyticsWriter = Symbol("IAnalyticsWriter"); -type Identity = - | { userId: string | number; anonymousId?: string | number } - | { userId?: string | number; anonymousId: string | number }; +type Identity = { userId?: string | number; anonymousId?: string | number; subjectId?: string }; interface Message { messageId?: string; diff --git a/components/gitpod-protocol/src/messaging/error.ts b/components/gitpod-protocol/src/messaging/error.ts index 52f5bbef4b7d17..614f5b985d9346 100644 --- a/components/gitpod-protocol/src/messaging/error.ts +++ b/components/gitpod-protocol/src/messaging/error.ts @@ -65,6 +65,9 @@ export const ErrorCodes = { // 404 Not Found NOT_FOUND: 404 as const, + // 408 Request Timeout + REQUEST_TIMEOUT: 408 as const, + // 409 Conflict (e.g. already existing) CONFLICT: 409 as const, diff --git a/components/gitpod-protocol/src/util/logging.ts b/components/gitpod-protocol/src/util/logging.ts index 4542b703782d71..3ed477f1bc3a92 100644 --- a/components/gitpod-protocol/src/util/logging.ts +++ b/components/gitpod-protocol/src/util/logging.ts @@ -17,6 +17,7 @@ export interface LogContext { organizationId?: string; sessionId?: string; userId?: string; + subjectId?: string; workspaceId?: string; instanceId?: string; } diff --git a/components/server/src/analytics.ts b/components/server/src/analytics.ts index 0bc74292709dcb..5a886c4c036a87 100644 --- a/components/server/src/analytics.ts +++ b/components/server/src/analytics.ts @@ -5,9 +5,10 @@ */ import { User } from "@gitpod/gitpod-protocol"; import { Request } from "express"; -import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics"; +import { IAnalyticsWriter, IdentifyMessage, PageMessage, TrackMessage } from "@gitpod/gitpod-protocol/lib/analytics"; import * as crypto from "crypto"; import { clientIp } from "./express-util"; +import { ctxTrySubjectId } from "./util/request-context"; export async function trackLogin(user: User, request: Request, authHost: string, analytics: IAnalyticsWriter) { // make new complete identify call for each login @@ -129,3 +130,31 @@ function stripCookie(cookie: string) { return cookie; } } + +export class ContextAwareAnalyticsWriter implements IAnalyticsWriter { + constructor(readonly writer: IAnalyticsWriter) {} + + identify(msg: IdentifyMessage): void {} + + track(msg: TrackMessage): void {} + + page(msg: PageMessage): void { + const traceIds = this.getAnalyticsIds(); + this.writer.page({ + ...msg, + userId: msg.userId || traceIds.userId, + subjectId: msg.subjectId || traceIds.subjectId, + }); + } + + private getAnalyticsIds(): { userId?: string; subjectId?: string } { + const subjectId = ctxTrySubjectId(); + if (!subjectId) { + return {}; + } + return { + userId: subjectId.userId(), + subjectId: subjectId.toString(), + }; + } +} diff --git a/components/server/src/api/auth-provider-service-api.ts b/components/server/src/api/auth-provider-service-api.ts index 72fc00b500fdd1..63f673ee65aec8 100644 --- a/components/server/src/api/auth-provider-service-api.ts +++ b/components/server/src/api/auth-provider-service-api.ts @@ -23,21 +23,24 @@ import { DeleteAuthProviderResponse, } from "@gitpod/public-api/lib/gitpod/v1/authprovider_pb"; import { AuthProviderService } from "../auth/auth-provider-service"; -import { AuthProviderEntry, AuthProviderInfo } from "@gitpod/gitpod-protocol"; +import { AuthProviderEntry, AuthProviderInfo, User } from "@gitpod/gitpod-protocol"; import { Unauthenticated } from "./unauthenticated"; import { validate as uuidValidate } from "uuid"; import { selectPage } from "./pagination"; +import { ctxUserId } from "../util/request-context"; +import { UserService } from "../user/user-service"; @injectable() export class AuthProviderServiceAPI implements ServiceImpl { constructor( @inject(PublicAPIConverter) private readonly apiConverter: PublicAPIConverter, @inject(AuthProviderService) private readonly authProviderService: AuthProviderService, + @inject(UserService) private readonly userService: UserService, ) {} async createAuthProvider( request: CreateAuthProviderRequest, - context: HandlerContext, + _: HandlerContext, ): Promise { const ownerId = request.owner.case === "ownerId" ? request.owner.value : ""; const organizationId = request.owner.case === "organizationId" ? request.owner.value : ""; @@ -47,10 +50,10 @@ export class AuthProviderServiceAPI implements ServiceImpl { + async getAuthProvider(request: GetAuthProviderRequest, _: HandlerContext): Promise { if (!request.authProviderId) { throw new ConnectError("authProviderId is required", Code.InvalidArgument); } - const authProvider = await this.authProviderService.getAuthProvider(context.user.id, request.authProviderId); + const authProvider = await this.authProviderService.getAuthProvider(ctxUserId(), request.authProviderId); if (!authProvider) { throw new ConnectError("Provider not found.", Code.NotFound); } @@ -86,7 +89,7 @@ export class AuthProviderServiceAPI implements ServiceImpl { const target = request.id; const ownerId = target.case === "userId" ? target.value : ""; @@ -97,8 +100,8 @@ export class AuthProviderServiceAPI implements ServiceImpl { - const user = context.user; + const userId = ctxUserId(); + let user: User | undefined = undefined; + if (userId) { + user = await this.userService.findUserById(userId, userId); + } const aps = user ? await this.authProviderService.getAuthProviderDescriptions(user) : await this.authProviderService.getAuthProviderDescriptionsUnauthenticated(); @@ -135,7 +142,7 @@ export class AuthProviderServiceAPI implements ServiceImpl { if (!request.authProviderId) { throw new ConnectError("authProviderId is required", Code.InvalidArgument); @@ -146,23 +153,23 @@ export class AuthProviderServiceAPI implements ServiceImpl { if (!request.authProviderId) { throw new ConnectError("authProviderId is required", Code.InvalidArgument); } - const authProvider = await this.authProviderService.getAuthProvider(context.user.id, request.authProviderId); + const authProvider = await this.authProviderService.getAuthProvider(ctxUserId(), request.authProviderId); if (!authProvider) { throw new ConnectError("Provider not found.", Code.NotFound); } if (authProvider.organizationId) { await this.authProviderService.deleteAuthProviderOfOrg( - context.user.id, + ctxUserId(), authProvider.organizationId, request.authProviderId, ); } else { - await this.authProviderService.deleteAuthProviderOfUser(context.user.id, request.authProviderId); + await this.authProviderService.deleteAuthProviderOfUser(ctxUserId(), request.authProviderId); } return new DeleteAuthProviderResponse(); diff --git a/components/server/src/api/configuration-service-api.ts b/components/server/src/api/configuration-service-api.ts index 6d98b59b271e69..d94c0020a1a99d 100644 --- a/components/server/src/api/configuration-service-api.ts +++ b/components/server/src/api/configuration-service-api.ts @@ -20,6 +20,8 @@ import { } from "@gitpod/public-api/lib/gitpod/v1/configuration_pb"; import { PaginationResponse } from "@gitpod/public-api/lib/gitpod/v1/pagination_pb"; import { ApplicationError, ErrorCodes } from "@gitpod/gitpod-protocol/lib/messaging/error"; +import { ctxUserId } from "../util/request-context"; +import { UserService } from "../user/user-service"; @injectable() export class ConfigurationServiceAPI implements ServiceImpl { @@ -28,11 +30,13 @@ export class ConfigurationServiceAPI implements ServiceImpl { if (!req.organizationId) { throw new ApplicationError(ErrorCodes.BAD_REQUEST, "organization_id is required"); @@ -44,6 +48,11 @@ export class ConfigurationServiceAPI implements ServiceImpl { - async sayHello(req: SayHelloRequest, context: HandlerContext): Promise { + async sayHello(req: SayHelloRequest, _: HandlerContext): Promise { const response = new SayHelloResponse(); - response.reply = "Hello " + this.getSubject(context); + response.reply = "Hello " + getSubject(); return response; } - async *lotsOfReplies(req: LotsOfRepliesRequest, context: HandlerContext): AsyncGenerator { + async *lotsOfReplies(req: LotsOfRepliesRequest, _: HandlerContext): AsyncGenerator { let count = req.previousCount || 0; - while (!context.signal.aborted) { + while (true) { + ctxCheckAborted(); + const response = new LotsOfRepliesResponse(); - response.reply = `Hello ${this.getSubject(context)} ${count}`; + response.reply = `Hello ${getSubject()} ${count}`; response.count = count; yield response; count++; await new Promise((resolve) => setTimeout(resolve, 30000)); } } +} - private getSubject(context: HandlerContext): string { - return User.getName(context.user) || "World"; - } +function getSubject(): string { + return ctxTrySubjectId()?.toString() || "World"; } diff --git a/components/server/src/api/organization-service-api.ts b/components/server/src/api/organization-service-api.ts index a5448b5d50d247..4f2c798b6b10ae 100644 --- a/components/server/src/api/organization-service-api.ts +++ b/components/server/src/api/organization-service-api.ts @@ -41,6 +41,8 @@ import { OrganizationService } from "../orgs/organization-service"; import { OrganizationSettings as ProtocolOrganizationSettings } from "@gitpod/gitpod-protocol"; import { PaginationResponse } from "@gitpod/public-api/lib/gitpod/v1/pagination_pb"; import { validate as uuidValidate } from "uuid"; +import { ctxUserId } from "../util/request-context"; +import { ApplicationError, ErrorCodes } from "@gitpod/gitpod-protocol/lib/messaging/error"; @injectable() export class OrganizationServiceAPI implements ServiceImpl { @@ -51,22 +53,24 @@ export class OrganizationServiceAPI implements ServiceImpl { - const org = await this.orgService.createOrganization(context.user.id, req.name); + async createOrganization(req: CreateOrganizationRequest, _: HandlerContext): Promise { + // TODO(gpl) This mimicks the current behavior of adding the subjectId as owner + const ownerId = ctxUserId(); + if (!ownerId) { + throw new ApplicationError(ErrorCodes.BAD_REQUEST, "No userId available"); + } + const org = await this.orgService.createOrganization(ownerId, req.name); const response = new CreateOrganizationResponse(); response.organization = this.apiConverter.toOrganization(org); return response; } - async getOrganization(req: GetOrganizationRequest, context: HandlerContext): Promise { + async getOrganization(req: GetOrganizationRequest, _: HandlerContext): Promise { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); } - const org = await this.orgService.getOrganization(context.user.id, req.organizationId); + const org = await this.orgService.getOrganization(ctxUserId(), req.organizationId); const response = new GetOrganizationResponse(); response.organization = this.apiConverter.toOrganization(org); return response; @@ -74,7 +78,7 @@ export class OrganizationServiceAPI implements ServiceImpl { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); @@ -83,7 +87,9 @@ export class OrganizationServiceAPI implements ServiceImpl { const orgs = await this.orgService.listOrganizations( - context.user.id, + ctxUserId(), { limit: req.pagination?.pageSize || 100, offset: (req.pagination?.page || 0) * (req.pagination?.pageSize || 0), @@ -110,36 +116,36 @@ export class OrganizationServiceAPI implements ServiceImpl { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); } - await this.orgService.deleteOrganization(context.user.id, req.organizationId); + await this.orgService.deleteOrganization(ctxUserId(), req.organizationId); return new DeleteOrganizationResponse(); } async getOrganizationInvitation( req: GetOrganizationInvitationRequest, - context: HandlerContext, + _: HandlerContext, ): Promise { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); } - const invitation = await this.orgService.getOrCreateInvite(context.user.id, req.organizationId); + const invitation = await this.orgService.getOrCreateInvite(ctxUserId(), req.organizationId); const response = new GetOrganizationInvitationResponse(); response.invitationId = invitation.id; return response; } - async joinOrganization(req: JoinOrganizationRequest, context: HandlerContext): Promise { + async joinOrganization(req: JoinOrganizationRequest, _: HandlerContext): Promise { if (!uuidValidate(req.invitationId)) { throw new ConnectError("invitationId is required", Code.InvalidArgument); } - const orgId = await this.orgService.joinOrganization(context.user.id, req.invitationId); + const orgId = await this.orgService.joinOrganization(ctxUserId(), req.invitationId); const result = new JoinOrganizationResponse(); result.organizationId = orgId; return result; @@ -147,13 +153,13 @@ export class OrganizationServiceAPI implements ServiceImpl { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); } - const inviteId = await this.orgService.resetInvite(context.user.id, req.organizationId); + const inviteId = await this.orgService.resetInvite(ctxUserId(), req.organizationId); const result = new ResetOrganizationInvitationResponse(); result.invitationId = inviteId.id; return result; @@ -161,13 +167,13 @@ export class OrganizationServiceAPI implements ServiceImpl { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); } - const members = await this.orgService.listMembers(context.user.id, req.organizationId); + const members = await this.orgService.listMembers(ctxUserId(), req.organizationId); //TODO pagination const response = new ListOrganizationMembersResponse(); response.members = members.map((member) => this.apiConverter.toOrganizationMember(member)); @@ -178,7 +184,7 @@ export class OrganizationServiceAPI implements ServiceImpl { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); @@ -191,13 +197,13 @@ export class OrganizationServiceAPI implements ServiceImpl members.find((member) => member.userId === req.userId)); return new UpdateOrganizationMemberResponse({ member: member && this.apiConverter.toOrganizationMember(member), @@ -206,7 +212,7 @@ export class OrganizationServiceAPI implements ServiceImpl { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); @@ -215,19 +221,19 @@ export class OrganizationServiceAPI implements ServiceImpl { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); } - const settings = await this.orgService.getSettings(context.user.id, req.organizationId); + const settings = await this.orgService.getSettings(ctxUserId(), req.organizationId); const response = new GetOrganizationSettingsResponse(); response.settings = this.apiConverter.toOrganizationSettings(settings); return response; @@ -235,7 +241,7 @@ export class OrganizationServiceAPI implements ServiceImpl { if (!uuidValidate(req.organizationId)) { throw new ConnectError("organizationId is required", Code.InvalidArgument); @@ -253,7 +259,10 @@ export class OrganizationServiceAPI implements ServiceImpl { - const logContext: LogContextOptions & { - requestId?: string; - contextTimeMs: number; - grpc_service: string; - grpc_method: string; - } = { - contextTimeMs: performance.now(), - grpc_service, - grpc_method: prop as string, + const connectContext = args[1] as HandlerContext; + const requestContext: RequestContextSeed = { + requestId: v4(), + requestKind: "public-api", + requestMethod: `${grpc_service}/${prop as string}`, + startTime: performance.now(), + signal: connectContext.signal, }; - const withRequestContext = (fn: () => T): T => runWithLogContext("public-api", logContext, fn); + + const withRequestContext = (fn: () => T): T => runWithRequestContext(requestContext, fn); const method = type.methods[prop as string]; if (!method) { @@ -170,8 +174,6 @@ export class API { grpc_type = "bidi_stream"; } - logContext.requestId = v4(); - grpcServerStarted.labels(grpc_service, grpc_method, grpc_type).inc(); const stopTimer = grpcServerHandling.startTimer({ grpc_service, grpc_method, grpc_type }); const done = (err?: ConnectError) => { @@ -195,8 +197,6 @@ export class API { throw err; }; - const context = args[1] as HandlerContext; - const rateLimit = async (subjectId: string) => { const key = `${grpc_service}/${grpc_method}`; const options = self.config.rateLimits?.[key] || RateLimited.getOptions(target, prop); @@ -217,30 +217,36 @@ export class API { } }; - const apply = async (): Promise => { - const subjectId = await self.verify(context); - const isAuthenticated = !!subjectId; + // actually call the RPC handler + const auth = async () => { + // Authenticate + const userId = await self.verify(connectContext); + const isAuthenticated = !!userId; const requiresAuthentication = !Unauthenticated.get(target, prop); - if (!isAuthenticated && requiresAuthentication) { throw new ConnectError("unauthenticated", Code.Unauthenticated); } if (isAuthenticated) { - await rateLimit(subjectId); - context.user = await self.ensureFgaMigration(subjectId); + await rateLimit(userId); + await self.ensureFgaMigration(userId); } - // TODO(at) if unauthenticated, we still need to apply enforece a rate limit - // actually call the RPC handler + return userId ? SubjectId.fromUserId(userId) : undefined; + }; + + const apply = async (): Promise => { return Reflect.apply(target[prop as any], target, args); }; if (grpc_type === "unary" || grpc_type === "client_stream") { return withRequestContext(async () => { try { - const promise = await apply>(); - const result = await promise; + const subjectId = await auth(); + const result = await runWithSubjectId(subjectId, async () => { + const promise = await apply>(); + return await promise; + }); done(); return result; } catch (e) { @@ -248,9 +254,13 @@ export class API { } }); } + + let subjectId: SubjectId | undefined = undefined; return wrapAsyncGenerator( (async function* () { try { + subjectId = await auth(); + const generator = await apply>(); for await (const item of generator) { yield item; @@ -260,7 +270,7 @@ export class API { handleError(e); } })(), - withRequestContext, + (f) => runWithSubjectId(subjectId, f), ); }; }, @@ -271,8 +281,8 @@ export class API { const cookieHeader = (context.requestHeader.get("cookie") || "") as string; try { const claims = await this.sessionHandler.verifyJWTCookie(cookieHeader); - const subjectId = claims?.sub; - return subjectId; + const userId = claims?.sub; + return userId; } catch (error) { log.warn("Failed to authenticate user with JWT Session", error); return undefined; diff --git a/components/server/src/api/workspace-service-api.ts b/components/server/src/api/workspace-service-api.ts index ca38d16d58d092..9a42d274d1d53b 100644 --- a/components/server/src/api/workspace-service-api.ts +++ b/components/server/src/api/workspace-service-api.ts @@ -15,6 +15,7 @@ import { import { inject, injectable } from "inversify"; import { WorkspaceService } from "../workspace/workspace-service"; import { PublicAPIConverter } from "@gitpod/gitpod-protocol/lib/public-api-converter"; +import { ctxSignal, ctxUserId } from "../util/request-context"; @injectable() export class WorkspaceServiceAPI implements ServiceImpl { @@ -24,11 +25,11 @@ export class WorkspaceServiceAPI implements ServiceImpl { + async getWorkspace(req: GetWorkspaceRequest, _: HandlerContext): Promise { if (!req.workspaceId) { throw new ConnectError("workspaceId is required", Code.InvalidArgument); } - const info = await this.workspaceService.getWorkspace(context.user.id, req.workspaceId); + const info = await this.workspaceService.getWorkspace(ctxUserId(), req.workspaceId); const response = new GetWorkspaceResponse(); response.workspace = this.apiConverter.toWorkspace(info); return response; @@ -36,10 +37,10 @@ export class WorkspaceServiceAPI implements ServiceImpl { if (req.workspaceId) { - const instance = await this.workspaceService.getCurrentInstance(context.user.id, req.workspaceId); + const instance = await this.workspaceService.getCurrentInstance(ctxUserId(), req.workspaceId); const status = this.apiConverter.toWorkspace(instance).status; if (status) { const response = new WatchWorkspaceStatusResponse(); @@ -48,7 +49,7 @@ export class WorkspaceServiceAPI implements ServiceImpl = new Map( + Object.keys(SubjectKindNames).map((k) => { + return [SubjectKindNames[k as SubjectKind], k as SubjectKind]; + }), +); + +export class SubjectId { + private static readonly SEPARATOR = "_"; + + constructor(public readonly kind: SubjectKind, public readonly value: string) {} + + public static fromUserId(userId: string): SubjectId { + return new SubjectId("user", userId); + } + + public static is(obj: any): obj is SubjectId { + return !!obj && obj instanceof SubjectId; + } + + public static isSubjectKind(str: string): str is SubjectKind { + return !!SubjectKindNames[str as SubjectKind]; + } + + public toString(): string { + const prefix = SubjectKindNames[this.kind]; + return prefix + SubjectId.SEPARATOR + this.value; + } + + public static parse(str: string): SubjectId | undefined { + try { + return SubjectId.tryParse(str); + } catch (err) { + return undefined; + } + } + + public static tryParse(str: string): SubjectId { + const parts = str.split(SubjectId.SEPARATOR); + if (parts.length < 2) { + throw new Error(`Unable to parse SubjectId`); + } + const kind = SubjectKindByShortName.get(parts[0]); + if (!kind) { + throw new Error(`Unable to parse SubjectId: unknown SubjectKind!`); + } + const value = parts.slice(1).join(); + return new SubjectId(kind, value); + } + + public userId(): string | undefined { + if (this.kind === "user") { + return this.value; + } + return undefined; + } + + public equals(other: SubjectId): boolean { + return this.kind === other.kind && this.value === other.value; + } +} + +// The following codes is meant for backwards-compatibility with the existing express types, or other code, that relies on `userId: string` or `User` +/** + * Interface type meant for backwards compatibility + */ +export type Subject = string | SubjectId; diff --git a/components/server/src/authorization/caching-spicedb-authorizer.spec.db.ts b/components/server/src/authorization/caching-spicedb-authorizer.spec.db.ts index 30f363a5eda208..16e562ad68c36c 100644 --- a/components/server/src/authorization/caching-spicedb-authorizer.spec.db.ts +++ b/components/server/src/authorization/caching-spicedb-authorizer.spec.db.ts @@ -17,12 +17,27 @@ import { WorkspaceService } from "../workspace/workspace-service"; import { UserService } from "../user/user-service"; import { ConfigProvider } from "../workspace/config-provider"; import { v1 } from "@authzed/authzed-node"; -import { runWithContext } from "../util/request-context"; +import { runWithRequestContext } from "../util/request-context"; import { RequestLocalZedTokenCache } from "./spicedb-authorizer"; +import { Subject, SubjectId } from "../auth/subject-id"; const expect = chai.expect; -const withCtx = (p: Promise) => runWithContext("test", {}, () => p); +const withCtx = (subject: Subject | User, p: Promise | (() => Promise)) => + runWithRequestContext( + { + requestKind: "testContext", + requestMethod: "testMethod", + signal: new AbortController().signal, + subjectId: SubjectId.is(subject) ? subject : SubjectId.fromUserId(User.is(subject) ? subject.id : subject), + }, + () => { + if (typeof p === "function") { + return p(); + } + return p; + }, + ); describe("CachingSpiceDBAuthorizer", async () => { let container: Container; @@ -60,8 +75,8 @@ describe("CachingSpiceDBAuthorizer", async () => { it("should avoid new-enemy after removal", async () => { // userB and userC are members of org1, userA is owner. // All users are installation owned. - const org1 = await withCtx(orgSvc.createOrganization(SYSTEM_USER, "org1")); const userA = await withCtx( + SYSTEM_USER, userSvc.createUser({ organizationId: undefined, identity: { @@ -71,8 +86,10 @@ describe("CachingSpiceDBAuthorizer", async () => { }, }), ); - await withCtx(orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userA.id, "owner")); + const org1 = await withCtx(userA, orgSvc.createOrganization(userA.id, "org1")); + await withCtx(SYSTEM_USER, orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userA.id, "owner")); const userB = await withCtx( + SYSTEM_USER, userSvc.createUser({ organizationId: undefined, identity: { @@ -82,8 +99,9 @@ describe("CachingSpiceDBAuthorizer", async () => { }, }), ); - await withCtx(orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userB.id, "member")); + await withCtx(SYSTEM_USER, orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userB.id, "member")); const userC = await withCtx( + SYSTEM_USER, userSvc.createUser({ organizationId: undefined, identity: { @@ -93,38 +111,38 @@ describe("CachingSpiceDBAuthorizer", async () => { }, }), ); - await withCtx(orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userC.id, "member")); + await withCtx(SYSTEM_USER, orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userC.id, "member")); // userA creates a workspace when userB is still member of the org // All members have "read_info" (derived from membership) - const ws1 = await withCtx(createTestWorkspace(org1, userA)); + const ws1 = await withCtx(userA, createTestWorkspace(org1, userA)); expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userB.id, "read_info", ws1.id)), + await withCtx(userB, authorizer.hasPermissionOnWorkspace(userB.id, "read_info", ws1.id)), "userB should have read_info after removal", ).to.be.true; expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userA.id, "read_info", ws1.id)), + await withCtx(userA, authorizer.hasPermissionOnWorkspace(userA.id, "read_info", ws1.id)), "userA should have read_info after removal of userB", ).to.be.true; expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userC.id, "read_info", ws1.id)), + await withCtx(userC, authorizer.hasPermissionOnWorkspace(userC.id, "read_info", ws1.id)), "userC should have read_info after removal of userB", ).to.be.true; // userB is removed from the org - await withCtx(orgSvc.removeOrganizationMember(SYSTEM_USER, org1.id, userB.id)); + await withCtx(SYSTEM_USER, orgSvc.removeOrganizationMember(SYSTEM_USER, org1.id, userB.id)); expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userB.id, "read_info", ws1.id)), + await withCtx(userB, authorizer.hasPermissionOnWorkspace(userB.id, "read_info", ws1.id)), "userB should have read_info after removal", ).to.be.false; expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userA.id, "read_info", ws1.id)), + await withCtx(userA, authorizer.hasPermissionOnWorkspace(userA.id, "read_info", ws1.id)), "userA should have read_info after removal of userB", ).to.be.true; expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userC.id, "read_info", ws1.id)), + await withCtx(userC, authorizer.hasPermissionOnWorkspace(userC.id, "read_info", ws1.id)), "userC should have read_info after removal of userB", ).to.be.true; }); @@ -153,8 +171,8 @@ describe("CachingSpiceDBAuthorizer", async () => { it("should avoid read-your-writes problem when adding a new user", async () => { // userB and userC are members of org1, userA is owner. // All users are installation owned. - const org1 = await withCtx(orgSvc.createOrganization(SYSTEM_USER, "org1")); const userA = await withCtx( + SYSTEM_USER, userSvc.createUser({ organizationId: undefined, identity: { @@ -164,8 +182,10 @@ describe("CachingSpiceDBAuthorizer", async () => { }, }), ); - await withCtx(orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userA.id, "owner")); + const org1 = await withCtx(userA, orgSvc.createOrganization(userA.id, "org1")); + await withCtx(SYSTEM_USER, orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userA.id, "owner")); const userC = await withCtx( + SYSTEM_USER, userSvc.createUser({ organizationId: undefined, identity: { @@ -175,22 +195,23 @@ describe("CachingSpiceDBAuthorizer", async () => { }, }), ); - await withCtx(orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userC.id, "member")); + await withCtx(SYSTEM_USER, orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userC.id, "member")); // userA creates a workspace before userB is member of the org - const ws1 = await withCtx(createTestWorkspace(org1, userA)); + const ws1 = await withCtx(userA, createTestWorkspace(org1, userA)); expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userA.id, "read_info", ws1.id)), + await withCtx(SYSTEM_USER, authorizer.hasPermissionOnWorkspace(userA.id, "read_info", ws1.id)), "userA should have read_info after removal of userB", ).to.be.true; expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userC.id, "read_info", ws1.id)), + await withCtx(userC, authorizer.hasPermissionOnWorkspace(userC.id, "read_info", ws1.id)), "userC should have read_info after removal of userB", ).to.be.true; // userB is added to the org const userB = await withCtx( + SYSTEM_USER, userSvc.createUser({ organizationId: undefined, identity: { @@ -200,18 +221,18 @@ describe("CachingSpiceDBAuthorizer", async () => { }, }), ); - await withCtx(orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userB.id, "member")); + await withCtx(SYSTEM_USER, orgSvc.addOrUpdateMember(SYSTEM_USER, org1.id, userB.id, "member")); expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userB.id, "read_info", ws1.id)), + await withCtx(userB, authorizer.hasPermissionOnWorkspace(userB.id, "read_info", ws1.id)), "userB should have read_info after removal", ).to.be.true; expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userA.id, "read_info", ws1.id)), + await withCtx(userA, authorizer.hasPermissionOnWorkspace(userA.id, "read_info", ws1.id)), "userA should have read_info after removal of userB", ).to.be.true; expect( - await withCtx(authorizer.hasPermissionOnWorkspace(userC.id, "read_info", ws1.id)), + await withCtx(userC, authorizer.hasPermissionOnWorkspace(userC.id, "read_info", ws1.id)), "userC should have read_info after removal of userB", ).to.be.true; }); @@ -253,7 +274,7 @@ describe("RequestLocalZedTokenCache", async () => { }); it("should store token", async () => { - await runWithContext("test", {}, async () => { + await withCtx(SYSTEM_USER, async () => { expect(await cache.get(ws1)).to.be.undefined; await cache.set([ws1, rawToken1]); expect(await cache.get(ws1)).to.equal(rawToken1); @@ -261,7 +282,7 @@ describe("RequestLocalZedTokenCache", async () => { }); it("should return newest token", async () => { - await runWithContext("test", {}, async () => { + await withCtx(SYSTEM_USER, async () => { await cache.set([ws1, rawToken1]); await cache.set([ws1, rawToken2]); expect(await cache.get(ws1)).to.equal(rawToken2); @@ -271,7 +292,7 @@ describe("RequestLocalZedTokenCache", async () => { }); it("should return proper consistency", async () => { - await runWithContext("test", {}, async () => { + await withCtx(SYSTEM_USER, async () => { expect(await cache.consistency(ws1)).to.deep.equal(fullyConsistent()); await cache.set([ws1, rawToken1]); expect(await cache.consistency(ws1)).to.deep.equal(atLeastAsFreshAs(rawToken1)); @@ -279,7 +300,7 @@ describe("RequestLocalZedTokenCache", async () => { }); it("should clear cache", async () => { - await runWithContext("test", {}, async () => { + await withCtx(SYSTEM_USER, async () => { await cache.set([ws1, rawToken1]); expect(await cache.get(ws1)).to.equal(rawToken1); await cache.set([ws1, undefined]); // this should trigger a clear diff --git a/components/server/src/authorization/spicedb-authorizer.ts b/components/server/src/authorization/spicedb-authorizer.ts index 6c45d9efa7ac76..66c9d3ca919247 100644 --- a/components/server/src/authorization/spicedb-authorizer.ts +++ b/components/server/src/authorization/spicedb-authorizer.ts @@ -14,8 +14,7 @@ import * as grpc from "@grpc/grpc-js"; import { isFgaChecksEnabled, isFgaWritesEnabled } from "./authorizer"; import { base64decode } from "@jmondi/oauth2-server"; import { DecodedZedToken } from "@gitpod/spicedb-impl/lib/impl/v1/impl.pb"; -import { RequestContext } from "node-fetch"; -import { getRequestContext } from "../util/request-context"; +import { ctxGetCache, ctxSetCache } from "../util/request-context"; import { ApplicationError, ErrorCodes } from "@gitpod/gitpod-protocol/lib/messaging/error"; async function tryThree(errMessage: string, code: (attempt: number) => Promise): Promise { @@ -240,10 +239,14 @@ interface ZedTokenCache { consistency(resourceRef: v1.ObjectReference | undefined): Promise; } -type ContextWithZedToken = RequestContext & { zedToken?: StoredZedToken }; -function getContext(): ContextWithZedToken { - return getRequestContext() as ContextWithZedToken; -} +// "contribute" a cache shape to the request context +type ZedTokenCacheType = StoredZedToken; +const ctxCacheSetZedToken = (zedToken: StoredZedToken | undefined): void => { + ctxSetCache("zedToken", zedToken); +}; +const ctxCacheGetZedToken = (): StoredZedToken | undefined => { + return ctxGetCache("zedToken"); +}; /** * This is a simple implementation of the ZedTokenCache that uses the local context to store single ZedToken per API request, which is stored in AsyncLocalStorage. @@ -253,12 +256,12 @@ export class RequestLocalZedTokenCache implements ZedTokenCache { constructor() {} async get(objectRef: v1.ObjectReference | undefined): Promise { - return getContext().zedToken?.token; + return ctxCacheGetZedToken()?.token; } async set(...kvs: ZedTokenCacheKV[]) { function clearZedTokenOnContext() { - getContext().zedToken = undefined; + ctxCacheSetZedToken(undefined); } const mustClearCache = kvs.some(([k, v]) => !!k && !v); // did we write a relationship without getting a writtenAt token? @@ -270,11 +273,11 @@ export class RequestLocalZedTokenCache implements ZedTokenCache { try { const allTokens = [ ...kvs.map(([_, v]) => (!!v ? StoredZedToken.fromToken(v) : undefined)), - getContext().zedToken, + ctxCacheGetZedToken(), ].filter((v) => !!v) as StoredZedToken[]; const freshest = this.freshest(...allTokens); if (freshest) { - getContext().zedToken = freshest; + ctxCacheSetZedToken(freshest); } } catch (err) { log.warn("[spicedb] Failed to set ZedToken on context", err); diff --git a/components/server/src/container-module.ts b/components/server/src/container-module.ts index 4e369eed489785..f72f4406ca79d3 100644 --- a/components/server/src/container-module.ts +++ b/components/server/src/container-module.ts @@ -130,6 +130,7 @@ import { WorkspaceService } from "./workspace/workspace-service"; import { WorkspaceStartController } from "./workspace/workspace-start-controller"; import { WorkspaceStarter } from "./workspace/workspace-starter"; import { DefaultWorkspaceImageValidator } from "./orgs/default-workspace-image-validator"; +import { ContextAwareAnalyticsWriter } from "./analytics"; export const productionContainerModule = new ContainerModule( (bind, unbind, isBound, rebind, unbindAsync, onActivation, onDeactivation) => { @@ -249,7 +250,12 @@ export const productionContainerModule = new ContainerModule( bind(CodeSyncService).toSelf().inSingletonScope(); - bind(IAnalyticsWriter).toDynamicValue(newAnalyticsWriterFromEnv).inSingletonScope(); + bind(IAnalyticsWriter) + .toDynamicValue((ctx) => { + const writer = newAnalyticsWriterFromEnv(); + return new ContextAwareAnalyticsWriter(writer); + }) + .inSingletonScope(); bind(OAuthController).toSelf().inSingletonScope(); diff --git a/components/server/src/jobs/runner.ts b/components/server/src/jobs/runner.ts index c1b08614c1839e..cc6c80577f6c57 100644 --- a/components/server/src/jobs/runner.ts +++ b/components/server/src/jobs/runner.ts @@ -18,7 +18,9 @@ import { WorkspaceGarbageCollector } from "./workspace-gc"; import { SnapshotsJob } from "./snapshots"; import { RelationshipUpdateJob } from "../authorization/relationship-updater-job"; import { WorkspaceStartController } from "../workspace/workspace-start-controller"; -import { runWithContext } from "../util/request-context"; +import { runWithRequestContext } from "../util/request-context"; +import { SYSTEM_USER } from "../authorization/authorizer"; +import { SubjectId } from "../auth/subject-id"; export const Job = Symbol("Job"); @@ -81,7 +83,13 @@ export class JobRunner { try { await this.mutex.using([job.name, ...(job.lockedResources || [])], job.frequencyMs, async (signal) => { - await runWithContext(job.name, {}, async () => { + const ctx = { + signal, + requestKind: "job", + requestMethod: job.name, + subjectId: SubjectId.fromUserId(SYSTEM_USER), + }; + await runWithRequestContext(ctx, async () => { log.info(`Acquired lock for job ${job.name}.`, logCtx); // we want to hold the lock for the entire duration of the job, so we return earliest after frequencyMs const timeout = new Promise((resolve) => setTimeout(resolve, job.frequencyMs)); diff --git a/components/server/src/messaging/redis-subscriber.ts b/components/server/src/messaging/redis-subscriber.ts index 580e3286b99f7b..d2a780aa9f2fd6 100644 --- a/components/server/src/messaging/redis-subscriber.ts +++ b/components/server/src/messaging/redis-subscriber.ts @@ -29,7 +29,9 @@ import { } from "../prometheus-metrics"; import { Redis } from "ioredis"; import { WorkspaceDB } from "@gitpod/gitpod-db/lib"; -import { runWithContext } from "../util/request-context"; +import { runWithRequestContext } from "../util/request-context"; +import { SYSTEM_USER } from "../authorization/authorizer"; +import { SubjectId } from "../auth/subject-id"; const UNDEFINED_KEY = "undefined"; @@ -55,7 +57,13 @@ export class RedisSubscriber { } this.redis.on("message", async (channel: string, message: string) => { - await runWithContext("redis-subscriber", {}, async () => { + const ctx = { + signal: new AbortSignal(), + requestKind: "redis-subscriber", + requestMethod: channel, + subjectId: SubjectId.fromUserId(SYSTEM_USER), + }; + await runWithRequestContext(ctx, async () => { reportRedisUpdateReceived(channel); let err: Error | undefined; diff --git a/components/server/src/prebuilds/github-app.ts b/components/server/src/prebuilds/github-app.ts index ecdee252bfdea1..1d68515eafe687 100644 --- a/components/server/src/prebuilds/github-app.ts +++ b/components/server/src/prebuilds/github-app.ts @@ -41,7 +41,9 @@ import { RepoURL } from "../repohost"; import { ApplicationError, ErrorCode } from "@gitpod/gitpod-protocol/lib/messaging/error"; import { UserService } from "../user/user-service"; import { ProjectsService } from "../projects/projects-service"; +import { runWithSubjectId, runWithRequestContext } from "../util/request-context"; import { SYSTEM_USER } from "../authorization/authorizer"; +import { SubjectId } from "../auth/subject-id"; /** * GitHub app urls: @@ -112,6 +114,20 @@ export class GithubApp { res.redirect(301, this.getBadgeImageURL()); }); + // Use RequestContext + options.getRouter && + options.getRouter().use((req, res, next) => { + runWithRequestContext( + { + requestKind: "probot", + requestMethod: req.path, + signal: new AbortController().signal, + }, + () => next(), + ); + next(); + }); + app.on("installation.created", (ctx: Context<"installation.created">) => { catchError( (async () => { @@ -159,9 +175,11 @@ export class GithubApp { // To implement this in a more robust way, we'd need to store `repository.id` with the project, next to the cloneUrl. const oldName = (ctx.payload as any)?.changes?.repository?.name?.from; if (oldName) { - const projects = await this.projectService.findProjectsByCloneUrl( - SYSTEM_USER, - `https://github.com/${repository.owner.login}/${oldName}.git`, + const projects = await runWithSubjectId(SubjectId.fromUserId(SYSTEM_USER), async () => + this.projectService.findProjectsByCloneUrl( + SYSTEM_USER, + `https://github.com/${repository.owner.login}/${oldName}.git`, + ), ); for (const project of projects) { project.cloneUrl = repository.clone_url; @@ -283,7 +301,9 @@ export class GithubApp { const contextURL = `${repo.html_url}/tree/${branch}`; span.setTag("contextURL", contextURL); const context = (await this.contextParser.handle({ span }, installationOwner, contextURL)) as CommitContext; - const projects = await this.projectService.findProjectsByCloneUrl(SYSTEM_USER, context.repository.cloneUrl); + const projects = await runWithSubjectId(SubjectId.fromUserId(SYSTEM_USER), async () => + this.projectService.findProjectsByCloneUrl(SYSTEM_USER, context.repository.cloneUrl), + ); for (const project of projects) { try { const user = await this.findProjectOwner(project, installationOwner); diff --git a/components/server/src/prebuilds/github-enterprise-app.ts b/components/server/src/prebuilds/github-enterprise-app.ts index 9b185b0d224e4f..5ad2ee77907cca 100644 --- a/components/server/src/prebuilds/github-enterprise-app.ts +++ b/components/server/src/prebuilds/github-enterprise-app.ts @@ -23,6 +23,8 @@ import { UserService } from "../user/user-service"; import { ApplicationError, ErrorCodes } from "@gitpod/gitpod-protocol/lib/messaging/error"; import { ProjectsService } from "../projects/projects-service"; import { SYSTEM_USER } from "../authorization/authorizer"; +import { runWithSubjectId } from "../util/request-context"; +import { SubjectId } from "../auth/subject-id"; @injectable() export class GitHubEnterpriseApp { @@ -258,7 +260,9 @@ export class GitHubEnterpriseApp { private async findProjectOwners(cloneURL: string): Promise<{ users: User[]; project: Project } | undefined> { try { - const projects = await this.projectService.findProjectsByCloneUrl(SYSTEM_USER, cloneURL); + const projects = await runWithSubjectId(SubjectId.fromUserId(SYSTEM_USER), async () => + this.projectService.findProjectsByCloneUrl(SYSTEM_USER, cloneURL), + ); const project = projects[0]; if (project) { const users = []; diff --git a/components/server/src/server.ts b/components/server/src/server.ts index 1e360a3777ab16..bd5f5e4d543a73 100644 --- a/components/server/src/server.ts +++ b/components/server/src/server.ts @@ -47,7 +47,8 @@ import { GitHubEnterpriseApp } from "./prebuilds/github-enterprise-app"; import { JobRunner } from "./jobs/runner"; import { RedisSubscriber } from "./messaging/redis-subscriber"; import { HEADLESS_LOGS_PATH_PREFIX, HEADLESS_LOG_DOWNLOAD_PATH_PREFIX } from "./workspace/headless-log-service"; -import { runWithLogContext } from "./util/log-context"; +import { runWithRequestContext } from "./util/request-context"; +import { SubjectId } from "./auth/subject-id"; @injectable() export class Server { @@ -140,14 +141,18 @@ export class Server { // Install passport await this.authenticator.init(app); - // log context - app.use(async (req: express.Request, res: express.Response, next: express.NextFunction) => { - try { - const userId = req.user ? req.user.id : undefined; - runWithLogContext("http", { userId, requestPath: req.path }, () => next()); - } catch (err) { - next(err); - } + // Use RequestContext for authorization + app.use((req: express.Request, res: express.Response, next: express.NextFunction) => { + const userId = req.user ? req.user.id : undefined; + runWithRequestContext( + { + requestKind: "http", + requestMethod: req.path, + signal: new AbortController().signal, + subjectId: userId ? SubjectId.fromUserId(userId) : undefined, // TODO(gpl) Can we assume this? E.g., has this been verified? It should: It means we could decode the cookie, right? + }, + () => next(), + ); }); // Ensure that host contexts of dynamic auth providers are initialized. diff --git a/components/server/src/util/log-context.ts b/components/server/src/util/log-context.ts index 1c5b3fbf709ced..b46628bcd87856 100644 --- a/components/server/src/util/log-context.ts +++ b/components/server/src/util/log-context.ts @@ -6,28 +6,37 @@ import { LogContext } from "@gitpod/gitpod-protocol/lib/util/logging"; import { performance } from "node:perf_hooks"; -import { RequestContext, getGlobalContext, runWithContext } from "./request-context"; +import { RequestContext, ctxTryGet } from "./request-context"; export type LogContextOptions = LogContext & { [p: string]: any; }; +function mapToLogContext(ctx: RequestContext): LogContextOptions { + return { + requestId: ctx.requestId, + requestKind: ctx.requestKind, + requestMethod: ctx.requestMethod, + traceId: ctx.traceId, + subjectId: ctx.subjectId?.toString(), + userId: ctx.subjectId?.userId(), + contextTimeMs: ctx?.startTime ? performance.now() - ctx.startTime : undefined, + }; +} + // we are installing a special augmenter that enhances the log context if executed within `runWithContext` // with a contextId and a contextTimeMs, which denotes the amount of milliseconds since the context was created. -export type EnhancedLogContext = RequestContext & LogContextOptions; const augmenter: LogContext.Augmenter = (ctx) => { - const globalContext = getGlobalContext(); - const contextTimeMs = globalContext?.contextTimeMs ? performance.now() - globalContext.contextTimeMs : undefined; + const requestContext = ctxTryGet(); + let derivedContext: LogContextOptions = {}; + if (requestContext) { + derivedContext = mapToLogContext(requestContext); + } const result = { - ...globalContext, - contextTimeMs, + ...derivedContext, ...ctx, }; // if its an empty object return undefined return Object.keys(result).length === 0 ? undefined : result; }; LogContext.setAugmenter(augmenter); - -export function runWithLogContext(contextKind: string, context: EnhancedLogContext, fun: () => T): T { - return runWithContext(contextKind, context, fun); -} diff --git a/components/server/src/util/request-context.ts b/components/server/src/util/request-context.ts index e762890f9ffbb7..8861410e86f6b4 100644 --- a/components/server/src/util/request-context.ts +++ b/components/server/src/util/request-context.ts @@ -7,32 +7,156 @@ import { AsyncLocalStorage } from "node:async_hooks"; import { performance } from "node:perf_hooks"; import { v4 } from "uuid"; +import { SubjectId } from "../auth/subject-id"; +import { ApplicationError, ErrorCodes } from "@gitpod/gitpod-protocol/lib/messaging/error"; +/** + * ReqeuestContext is the context that all our request-handling code runs in. + * All code has access to the contained fields by using the exported "ctx...()" functions below. + * + * It's meant to be the host all concerns we have for a request. For now, those are: + * - authorization (via subjectId) + * - caching (via cache, ctxSetCache, ctxGetCache) + * + * It's meant to be nestable, so that we can run code in a child context with different properties. + * The only example we have for now is "runWithSubjectId", which executes the child context with different authorization. + * @see runWithSubjectId + */ export interface RequestContext { - contextId?: string; - contextKind?: string; - contextTimeMs?: number; + /** + * Unique, artificial ID for this request. + */ + readonly requestId: string; + + /** + * A request kind e.g. "job", "http" or "grpc". + */ + readonly requestKind: string; + + /** + * A name. Specific values depends on requestKind. + */ + readonly requestMethod: string; + + /** + * Propagate cancellation through the handler chain. + */ + readonly signal: AbortSignal; + + /** + * The UNIX timestamp in milliseconds when request processing started. + */ + readonly startTime: number; + + /** + * A cache for request-scoped data. + */ + readonly cache: { [key: string]: any }; + + /** + * The trace ID for this request. This is used to correlate log messages and other events. + */ + readonly traceId?: string; + + /** + * The SubjectId this request is authenticated with. + */ + readonly subjectId?: SubjectId; } const asyncLocalStorage = new AsyncLocalStorage(); + +export function ctxGet(): RequestContext { + const ctx = asyncLocalStorage.getStore(); + if (!ctx) { + throw new Error("ctxGet: No request context available"); + } + return ctx; +} + +export function ctxTryGet(): RequestContext | undefined { + return asyncLocalStorage.getStore() || undefined; +} + +/** + * @returns the SubjectId this request is authenticated with, or undefined if it isn't + */ +export function ctxTrySubjectId(): SubjectId | undefined { + const ctx = ctxTryGet(); + return ctx?.subjectId; +} + +/** + * @throws 500/INTERNAL_SERVER_ERROR if there is no userId + * @returns The userId associated with the current request. + */ +export function ctxUserId(): string { + const userId = ctxGet()?.subjectId?.userId(); + if (!userId) { + throw new ApplicationError(ErrorCodes.INTERNAL_SERVER_ERROR, "No userId available"); + } + return userId; +} + +/** + * @throws 408/REQUEST_TIMEOUT if the request has been aborted + */ +export function ctxCheckAborted() { + if (ctxGet().signal.aborted) { + throw new ApplicationError(ErrorCodes.REQUEST_TIMEOUT, "Request aborted"); + } +} + +/** + * @returns The AbortSignal associated with the current request. + */ +export function ctxSignal() { + return ctxGet().signal; +} + +/** Encode cache keys in type to avoid clashes at compile time */ +type CacheKey = "zedToken"; +export function ctxGetCache(key: CacheKey, d: T | undefined = undefined): T | undefined { + return ctxGet().cache[key] || d; +} + +type UpdateCache = (prev: T | undefined) => T | undefined; +export function ctxSetCache(key: CacheKey, value: T | undefined | UpdateCache) { + if (typeof value === "function") { + const prev = ctxGetCache(key); + value = value(prev); + } + ctxGet().cache[key] = value; +} + +export type RequestContextSeed = Omit & { + requestId?: string; + startTime?: number; +}; + /** - * !!! Only to be used by selected internal code !!! + * The context all our request-handling code should run in. + * @param context + * @param fun * @returns */ -export function getGlobalContext(): RequestContext | undefined { - return asyncLocalStorage.getStore(); +export function runWithRequestContext(context: RequestContextSeed, fun: () => T): T { + const requestId = context.requestId || v4(); + const startTime = context.startTime || performance.now(); + const cache = {}; + return runWithContext({ ...context, requestId, startTime, cache }, fun); } -export function runWithContext(contextKind: string, context: C, fun: () => T): T { - return asyncLocalStorage.run( - { - ...context, - contextKind, - contextId: context.contextId || v4(), - contextTimeMs: context.contextTimeMs || performance.now(), - }, - fun, - ); +export function runWithSubjectId(subjectId: SubjectId | undefined, fun: () => T): T { + const parent = ctxTryGet(); + if (!parent) { + throw new Error("runWithChildContext: No parent context available"); + } + return runWithContext({ ...parent, subjectId }, fun); +} + +function runWithContext(context: C, fun: () => T): T { + return asyncLocalStorage.run(context, fun); } export type AsyncGeneratorDecorator = (f: () => T) => T; @@ -50,7 +174,3 @@ export function wrapAsyncGenerator( }, }; } - -export function getRequestContext(): RequestContext { - return asyncLocalStorage.getStore() || {}; // to ease usage we hand out an empty shape here -} diff --git a/components/server/src/websocket/websocket-connection-manager.ts b/components/server/src/websocket/websocket-connection-manager.ts index 7d2e8d4af0e549..c4e20aba61f1ee 100644 --- a/components/server/src/websocket/websocket-connection-manager.ts +++ b/components/server/src/websocket/websocket-connection-manager.ts @@ -49,7 +49,8 @@ import * as opentracing from "opentracing"; import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing"; import { GitpodHostUrl } from "@gitpod/gitpod-protocol/lib/util/gitpod-host-url"; import { maskIp } from "../analytics"; -import { runWithLogContext } from "../util/log-context"; +import { runWithRequestContext } from "../util/request-context"; +import { SubjectId } from "../auth/subject-id"; export type GitpodServiceFactory = () => GitpodServerImpl; @@ -376,18 +377,20 @@ class GitpodJsonRpcProxyFactory extends JsonRpcProxyFactory protected async onRequest(method: string, ...args: any[]): Promise { const span = TraceContext.startSpan(method, undefined); const userId = this.clientMetadata.userId; - const requestId = span.context().toTraceId(); - return runWithLogContext( - "request", + const rpcSignal = args[args.length - 1]; + const signal = rpcSignal ? (rpcSignal as AbortSignal) : new AbortController().signal; + return runWithRequestContext( { - userId, - contextId: requestId, - method, + requestKind: "jsonrpc", + requestMethod: method, + signal, + subjectId: userId ? SubjectId.fromUserId(userId) : undefined, + traceId: span.context().toTraceId(), }, () => { try { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - return this.internalOnRequest(span, requestId, method, ...args); + return this.internalOnRequest(span, method, ...args); } finally { span.finish(); } @@ -395,12 +398,7 @@ class GitpodJsonRpcProxyFactory extends JsonRpcProxyFactory ); } - private async internalOnRequest( - span: opentracing.Span, - requestId: string, - method: string, - ...args: any[] - ): Promise { + private async internalOnRequest(span: opentracing.Span, method: string, ...args: any[]): Promise { const userId = this.clientMetadata.userId; const ctx = { span }; const timer = apiCallDurationHistogram.startTimer(); diff --git a/components/server/src/workspace/gitpod-server-impl.ts b/components/server/src/workspace/gitpod-server-impl.ts index d19cdc47acf722..3cfac56e602dc4 100644 --- a/components/server/src/workspace/gitpod-server-impl.ts +++ b/components/server/src/workspace/gitpod-server-impl.ts @@ -174,6 +174,8 @@ import { suggestionFromRecentWorkspace, suggestionFromUserRepo, } from "./suggested-repos-sorter"; +import { SubjectId } from "../auth/subject-id"; +import { runWithSubjectId } from "../util/request-context"; // shortcut export const traceWI = (ctx: TraceContext, wi: Omit) => TraceContext.setOWI(ctx, wi); // userId is already taken care of in WebsocketConnectionManager @@ -466,11 +468,14 @@ export class GitpodServerImpl implements GitpodServerWithTracing, Disposable { private async checkUser(methodName?: string, logPayload?: {}, ctx?: LogContext): Promise { // Generally, a user session is required. - if (!this.userID) { + const userId = this.userID; + if (!userId) { throw new ApplicationError(ErrorCodes.NOT_AUTHENTICATED, "User is not authenticated. Please login."); } - const user = await this.userService.findUserById(SYSTEM_USER, this.userID); + const user = await runWithSubjectId(SubjectId.fromUserId(SYSTEM_USER), async () => + this.userService.findUserById(SYSTEM_USER, userId), + ); if (user.markedDeleted === true) { throw new ApplicationError(ErrorCodes.USER_DELETED, "User has been deleted."); } diff --git a/components/server/src/workspace/workspace-service.ts b/components/server/src/workspace/workspace-service.ts index b187841ec275af..daddc5e73fe85e 100644 --- a/components/server/src/workspace/workspace-service.ts +++ b/components/server/src/workspace/workspace-service.ts @@ -6,6 +6,7 @@ import { inject, injectable } from "inversify"; import * as grpc from "@grpc/grpc-js"; +import { EventIterator } from "event-iterator"; import { RedisPublisher, WorkspaceDB } from "@gitpod/gitpod-db/lib"; import { GetWorkspaceTimeoutResult, @@ -755,7 +756,7 @@ export class WorkspaceService { return urls; } - public watchWorkspaceStatus(userId: string, opts: { signal: AbortSignal }) { + public watchWorkspaceStatus(userId: string, opts: { signal: AbortSignal }): EventIterator { return generateAsyncGenerator((sink) => { try { const dispose = this.subscriber.listenForWorkspaceInstanceUpdates(userId, (_ctx, instance) => { diff --git a/components/server/src/workspace/workspace-starter.ts b/components/server/src/workspace/workspace-starter.ts index cc7edf7ef8fc24..667413610ea169 100644 --- a/components/server/src/workspace/workspace-starter.ts +++ b/components/server/src/workspace/workspace-starter.ts @@ -132,6 +132,8 @@ import { RedlockAbortSignal } from "redlock"; import { ConfigProvider } from "./config-provider"; import { isGrpcError } from "@gitpod/gitpod-protocol/lib/util/grpc"; import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server"; +import { SubjectId } from "../auth/subject-id"; +import { runWithSubjectId } from "../util/request-context"; export interface StartWorkspaceOptions extends GitpodServer.StartWorkspaceOptions { excludeFeatureFlags?: NamedWorkspaceFeatureFlag[]; @@ -488,7 +490,9 @@ export class WorkspaceStarter { if (blockedRepository.blockUser) { try { - await this.userService.blockUser(SYSTEM_USER, user.id, true); + await runWithSubjectId(SubjectId.fromUserId(SYSTEM_USER), async () => + this.userService.blockUser(SYSTEM_USER, user.id, true), + ); log.info({ userId: user.id }, "Blocked user.", { contextURL }); } catch (error) { log.error({ userId: user.id }, "Failed to block user.", error, { contextURL });