Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[papi] add watchWorkspace API #19010

Merged
merged 11 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions components/dashboard/src/service/json-rpc-workspace-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
* See License.AGPL.txt in the project root for license information.
*/

import { Code, ConnectError, PromiseClient } from "@connectrpc/connect";
import { CallOptions, Code, ConnectError, PromiseClient } from "@connectrpc/connect";
import { PartialMessage } from "@bufbuild/protobuf";
import { WorkspaceService } from "@gitpod/public-api/lib/gitpod/v1/workspace_connect";
import { GetWorkspaceRequest, GetWorkspaceResponse } from "@gitpod/public-api/lib/gitpod/v1/workspace_pb";
import {
GetWorkspaceRequest,
GetWorkspaceResponse,
WatchWorkspaceStatusRequest,
WatchWorkspaceStatusResponse,
} from "@gitpod/public-api/lib/gitpod/v1/workspace_pb";
import { converter } from "./public-api";
import { getGitpodService } from "./service";
import { generateAsyncGenerator } from "@gitpod/gitpod-protocol/lib/generate-async-generator";
import { WorkspaceInstance } from "@gitpod/gitpod-protocol";

export class JsonRpcWorkspaceClient implements PromiseClient<typeof WorkspaceService> {
async getWorkspace(request: PartialMessage<GetWorkspaceRequest>): Promise<GetWorkspaceResponse> {
Expand All @@ -22,4 +29,55 @@ export class JsonRpcWorkspaceClient implements PromiseClient<typeof WorkspaceSer
result.item = workspace;
return result;
}

async *watchWorkspaceStatus(
request: PartialMessage<WatchWorkspaceStatusRequest>,
options?: CallOptions,
): AsyncIterable<WatchWorkspaceStatusResponse> {
if (!options?.signal) {
throw new ConnectError("signal is required", Code.InvalidArgument);
}
if (request.workspaceId) {
const resp = await this.getWorkspace({ id: request.workspaceId });
mustard-mh marked this conversation as resolved.
Show resolved Hide resolved
if (resp.item?.status) {
const response = new WatchWorkspaceStatusResponse();
response.workspaceId = resp.item.id;
response.status = resp.item.status;
yield response;
}
}
const it = generateAsyncGenerator<WorkspaceInstance>(
(queue) => {
try {
const dispose = getGitpodService().registerClient({
onInstanceUpdate: (instance) => {
queue.push(instance);
},
});
return () => {
dispose.dispose();
};
} catch (e) {
queue.fail(e);
}
},
{ signal: options.signal },
);
for await (const item of it) {
if (!item) {
continue;
}
if (request.workspaceId && item.workspaceId !== request.workspaceId) {
continue;
}
const status = converter.toWorkspace(item).status;
if (!status) {
continue;
}
const response = new WatchWorkspaceStatusResponse();
response.workspaceId = item.workspaceId;
response.status = status;
yield response;
}
}
}
1 change: 1 addition & 0 deletions components/gitpod-protocol/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"analytics-node": "^6.0.0",
"configcat-node": "^8.0.0",
"cookie": "^0.4.2",
"event-iterator": "^2.0.0",
"express": "^4.17.3",
"google-protobuf": "^3.19.1",
"inversify": "^6.0.1",
Expand Down
185 changes: 185 additions & 0 deletions components/gitpod-protocol/src/generate-async-generator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/**
* Copyright (c) 2023 Gitpod GmbH. All rights reserved.
* Licensed under the GNU Affero General Public License (AGPL).
* See License.AGPL.txt in the project root for license information.
*/

import { suite, test } from "@testdeck/mocha";
import * as chai from "chai";

import { generateAsyncGenerator } from "./generate-async-generator";
import { Disposable } from "./util/disposable";

const expect = chai.expect;

function watchWith(times: number, listener: (value: number) => void): Disposable {
let i = 0;
const cancel = setInterval(() => {
if (i < times) {
listener(i++);
}
}, 100);
return {
dispose: () => {
clearInterval(cancel);
},
};
}

const error = new Error("Test error");
interface Ref {
isDisposed: boolean;
result: number[];
watchStarted: boolean;
}

interface Option {
errorAfter?: number;
times: number;
abortAfterMs?: number;
setupError?: boolean;
}

function watchIterator(ref: Ref, opts: Option) {
const abortController = new AbortController();
setTimeout(() => {
abortController.abort();
}, opts.abortAfterMs ?? 600);
return generateAsyncGenerator<number>(
(sink) => {
try {
if (opts.setupError) {
throw error;
}
ref.watchStarted = true;
const dispose = watchWith(opts.times, (v) => {
if (opts.errorAfter && opts.errorAfter === v) {
sink.fail(error);
return;
}
sink.push(v);
});
return () => {
ref.isDisposed = true;
dispose.dispose();
};
} catch (e) {
sink.fail(e as any as Error);
}
},
{ signal: abortController.signal },
);
}

@suite
class TestGenerateAsyncGenerator {
@test public async "happy path"() {
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
const it = watchIterator(ref, { times: 5 });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
if (ref.watchStarted) {
expect(ref.isDisposed).to.be.equal(true);
}
expect(e.message).to.be.equal("Abort error");
expect(ref.result.length).to.be.equal(5);
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
expect(ref.isDisposed).to.be.equal(true);
}
}

@test public async "should be stopped after abort signal is triggered"() {
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
const it = watchIterator(ref, { times: 5, abortAfterMs: 120 });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
if (ref.watchStarted) {
expect(ref.isDisposed).to.be.equal(true);
}
expect(e.message).to.be.equal("Abort error");
expect(ref.result[0]).to.be.equal(0);
expect(ref.result.length).to.be.equal(1);
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
expect(ref.isDisposed).to.be.equal(true);
}
}

@test public async "should throw error if setup throws"() {
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
const it = watchIterator(ref, { times: 5, setupError: true });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
if (ref.watchStarted) {
expect(ref.isDisposed).to.be.equal(true);
}
expect(e).to.be.equal(error);
expect(ref.result.length).to.be.equal(0);
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
expect(ref.isDisposed).to.be.equal(false);
}
}

@test public async "should propagate errors from sink.next"() {
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
const it = watchIterator(ref, { times: 5, errorAfter: 2 });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
if (ref.watchStarted) {
expect(ref.isDisposed).to.be.equal(true);
}
expect(e).to.be.equal(error);
expect(ref.result.length).to.be.equal(2);
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
expect(ref.isDisposed).to.be.equal(true);
}
}

@test public async "should not start iterator if pre throw error in an iterator"() {
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
const it = this.mockWatchWorkspaceStatus(ref, { times: 5, errorAfter: 2 });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
expect(ref.watchStarted).to.be.equal(false);
if (ref.watchStarted) {
expect(ref.isDisposed).to.be.equal(true);
}
expect(e.message).to.be.equal("Should throw error");
expect(ref.result.length).to.be.equal(0);
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
expect(ref.isDisposed).to.be.equal(false);
}
}

async *mockWatchWorkspaceStatus(ref: Ref, option: Option): AsyncIterable<number> {
const shouldThrow = true;
if (shouldThrow) {
throw new Error("Should throw error");
}
const it = watchIterator(ref, option);
for await (const item of it) {
yield item;
}
}
}

module.exports = new TestGenerateAsyncGenerator(); // Only to circumvent no usage warning :-/
35 changes: 35 additions & 0 deletions components/gitpod-protocol/src/generate-async-generator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (c) 2023 Gitpod GmbH. All rights reserved.
* Licensed under the GNU Affero General Public License (AGPL).
* See License.AGPL.txt in the project root for license information.
*/

import { EventIterator } from "event-iterator";
import { Queue } from "event-iterator/lib/event-iterator";

/**
* Generates an asynchronous generator that yields values based on the provided setup function.
*
* the setup function that takes a queue and returns a cleanup function.
* `queue.next` method that accepts a value to be pushed to the generator.
*
* remember that setup callback MUST wrap with try catch and use `queue.fail` to propagate error
*
* Iterator will always at least end with throw an `Abort error`
*/
export function generateAsyncGenerator<T>(
setup: (queue: Queue<T>) => (() => void) | void,
opts: { signal: AbortSignal },
) {
return new EventIterator<T>((queue) => {
opts.signal.addEventListener("abort", () => {
queue.fail(new Error("Abort error"));
});
const dispose = setup(queue);
return () => {
if (dispose) {
dispose();
}
};
});
}
20 changes: 20 additions & 0 deletions components/public-api/gitpod/v1/workspace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,32 @@ service WorkspaceService {
// +return NOT_FOUND User does not have access to a workspace with the given
// ID +return NOT_FOUND Workspace does not exist
rpc GetWorkspace(GetWorkspaceRequest) returns (GetWorkspaceResponse) {}

// WatchWorkspaceStatus watchs the workspaces status changes
//
// workspace_id +return NOT_FOUND Workspace does not exist
rpc WatchWorkspaceStatus(WatchWorkspaceStatusRequest) returns (stream WatchWorkspaceStatusResponse) {}
}

message GetWorkspaceRequest { string id = 1; }

message GetWorkspaceResponse { Workspace item = 1; }

message WatchWorkspaceStatusRequest {
// workspace_id specifies the workspace to watch
//
// +optional if empty then watch all workspaces
optional string workspace_id = 1;
}

message WatchWorkspaceStatusResponse {
// workspace_id is the ID of the workspace that has status updated
string workspace_id = 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need workspace_id since there's no workspace_id field in WorkspaceStatus


// status is the updated status of workspace
WorkspaceStatus status = 2;
}

// +resource get workspace
message Workspace {
string id = 1;
Expand Down
Loading
Loading