Skip to content

Commit

Permalink
nats: working on implementing sockets/channels compat layer (not done)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Jan 29, 2025
1 parent d920317 commit 4463105
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 12 deletions.
17 changes: 14 additions & 3 deletions src/packages/frontend/client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import { join } from "path";
import { redux } from "../app-framework";
import * as jetstream from "@nats-io/jetstream";
import { createSyncTable, type SyncTable } from "@cocalc/nats/sync/synctable";
import { randomId } from "@cocalc/nats/util";
import { parse_query } from "@cocalc/sync/table/util";
import { sha1 } from "@cocalc/util/misc";
import { keys } from "lodash";
import { type HubApi, initHubApi } from "@cocalc/nats/api/index";
import { Socket } from "@cocalc/nats/socket";

export class NatsClient {
/*private*/ client: WebappClient;
Expand All @@ -19,6 +21,7 @@ export class NatsClient {
public nats = nats;
public jetstream = jetstream;
public hub: HubApi;
public sessionId = randomId();

constructor(client: WebappClient) {
this.client = client;
Expand All @@ -30,7 +33,7 @@ export class NatsClient {
return this.nc;
}
const server = `${location.protocol == "https:" ? "wss" : "ws"}://${location.host}${appBasePath}/nats`;
console.log(`connecting to ${server}...`);
console.log(`NATS: connecting to ${server}...`);
try {
this.nc = await nats.connect({
servers: [server],
Expand All @@ -39,13 +42,13 @@ export class NatsClient {
pingInterval: 10000,
});
} catch (err) {
console.log("set the JWT cookie and try again");
console.log("NATS: set the JWT cookie and try again");
await fetch(join(appBasePath, "nats"));
this.nc = await nats.connect({
servers: [server],
});
}
console.log(`connected to ${server}`);
console.log(`NATS: connected to ${server}`);
return this.nc;
});

Expand Down Expand Up @@ -179,4 +182,12 @@ export class NatsClient {
this.changefeedInterest(query, true);
return await this.synctable(query, { atomic: true });
};

createSocket = async (subjects: { listen: string; send: string }) => {
return new Socket({
...subjects,
nc: await this.getConnection(),
jc: this.jc,
});
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ export class Terminal<T extends CodeEditorState = CodeEditorState> {
rows?: number;
cols?: number;
payload: any;
ignore?: string;
id?: number;
}): void {
//console.log("handle_mesg", this.id, mesg);
Expand All @@ -555,7 +556,9 @@ export class Terminal<T extends CodeEditorState = CodeEditorState> {
this.no_ignore();
break;
case "close":
this.close_request();
if (mesg.ignore != this.id) {
this.close_request();
}
break;
case "computeServerId":
if (this.actions.store != null && this.actions.setState != null) {
Expand Down
12 changes: 12 additions & 0 deletions src/packages/frontend/project/websocket/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ export class API {
return this.conn.channel(channel_name);
};

terminal1 = async (path: string, options: object = {}): Promise<Channel> => {
const subjects = await this.call(
{
cmd: "terminal",
path,
options,
},
20000,
);
return (await webapp_client.nats_client.createSocket(subjects)) as any;
};

project_info = async (): Promise<Channel> => {
const channel_name = await this.primusCall({ cmd: "project_info" }, 60000);
return this.conn.channel(channel_name);
Expand Down
3 changes: 2 additions & 1 deletion src/packages/nats/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"exports": {
"./sync/*": "./dist/sync/*.js",
"./api": "./dist/api/index.js",
"./api/*": "./dist/api/*.js"
"./api/*": "./dist/api/*.js",
"./*": "./dist/*.js"
},
"scripts": {
"preinstall": "npx only-allow pnpm",
Expand Down
47 changes: 47 additions & 0 deletions src/packages/nats/socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Implement a websocket as exposed in Primus over NATS.
*/

import { EventEmitter } from "events";

export class Socket extends EventEmitter {
private listen: string;
private send: string;
private nc;
private jc;

constructor({
listen,
send,
nc,
jc,
}: {
// subject to listen on
listen: string;
// subject to write to
send: string;
// nats connection
nc;
// json codec
jc;
}) {
super();
this.listen = listen;
this.send = send;
this.nc = nc;
this.jc = jc;
this.startListening();
}

private startListening = async () => {
const sub = this.nc.subscribe(this.listen);
for await (const mesg of sub) {
const { data } = this.jc.decode(mesg.data) ?? ({} as any);
this.emit("data", data);
}
};

write(data) {
this.nc.publish(this.send, this.jc.encode({ data }));
}
}
7 changes: 7 additions & 0 deletions src/packages/nats/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import generateVouchers from "@cocalc/util/vouchers";

// nice alphanumeric string that can be used as nats subject, and very
// unlikely to randomly collide with another browser tab from this account.
export function randomId() {
return generateVouchers({ count: 1, length: 10 })[0];
}
9 changes: 3 additions & 6 deletions src/packages/terminal/lib/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,12 +506,9 @@ export class Terminal {
spark.write({ cmd: "size", rows, cols });
}
}
// broadcast message to all other clients telling them to close.
this.channel.forEach((spark0, id, _) => {
if (id !== spark.id) {
spark0.write({ cmd: "close" });
}
});
// broadcast message to all clients telling them to close, but
// telling requestor to ignore.
spark.write({ cmd: "close", ignore: spark.id });
};

private writeToPty = async (data) => {
Expand Down
1 change: 0 additions & 1 deletion src/packages/terminal/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ export interface Options {

export interface PrimusChannel extends EventEmitter {
write: (data: object | string) => void;
forEach: (cb: (spark, id, connections) => void) => void;
destroy: () => void;
// createSpark is not on the real PrimusChannel, but it's part of our mock version for
// unit testing in support.ts
Expand Down

0 comments on commit 4463105

Please sign in to comment.