Skip to content

Commit

Permalink
add the telepresence implementation to the centralized link language
Browse files Browse the repository at this point in the history
  • Loading branch information
jdeepee committed Oct 14, 2023
1 parent d20d95e commit 54dad75
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 84 deletions.
12 changes: 10 additions & 2 deletions bootstrap-languages/centralized-p-diff-sync/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Address, Language, Interaction, HolochainLanguageDelegate, LanguageContext, AgentService } from "https://esm.sh/@perspect3vism/[email protected]";
import { LinkAdapter } from "./linksAdapter.ts";
import { TelepresenceAdapterImplementation } from "./telepresenceAdapter.ts";
import { io } from "https://esm.sh/[email protected]";

function interactions(expression: Address): Interaction[] {
return [];
Expand All @@ -13,8 +14,15 @@ const name = "centralized-perspective-diff-sync";
const uid = "centralized-perspective-diff-sync-uuid";

export default async function create(context: LanguageContext): Promise<Language> {
const linksAdapter = new LinkAdapter(context, uid);
const telepresenceAdapter = new TelepresenceAdapterImplementation(context, uid);
let socketClient = io("https://socket.ad4m.dev", {
transports: ['websocket', 'polling'],
autoConnect: true,
query: { did: context.agent.did, linkLanguageUUID: uid }
});
console.log("Created socket connection");

const linksAdapter = new LinkAdapter(context, uid, socketClient);
const telepresenceAdapter = new TelepresenceAdapterImplementation(context, uid, socketClient);

//@ts-ignore
return {
Expand Down
130 changes: 62 additions & 68 deletions bootstrap-languages/centralized-p-diff-sync/linksAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { LinkSyncAdapter, PerspectiveDiffObserver, HolochainLanguageDelegate, La
LinkExpression, DID, Perspective, PerspectiveState } from "https://esm.sh/@perspect3vism/[email protected]";
import type { SyncStateChangeObserver } from "https://esm.sh/@perspect3vism/[email protected]";
import { Mutex, withTimeout } from "https://esm.sh/[email protected]";
import { io, Socket, ServerToClientEvents, ClientToServerEvents } from "https://esm.sh/[email protected]";
import type { Socket, ServerToClientEvents, ClientToServerEvents } from "https://esm.sh/[email protected]";
import axiod from "https://deno.land/x/axiod/mod.ts";

export class LinkAdapter implements LinkSyncAdapter {
Expand All @@ -15,14 +15,10 @@ export class LinkAdapter implements LinkSyncAdapter {
socketClient: Socket<ServerToClientEvents, ClientToServerEvents>;
hasCalledSync = false;

constructor(context: LanguageContext, uid: String) {
constructor(context: LanguageContext, uid: String, socketClient: Socket<ServerToClientEvents, ClientToServerEvents>) {
this.me = context.agent.did;
this.languageUid = uid;

//this.addAgentRecord();

this.socketClient = io("https://socket.ad4m.dev", { transports: ['websocket', 'polling'], autoConnect: true });
console.log("Created socket connection");
this.socketClient = socketClient;

this.socketClient.on('error', (error: any) => {
console.error('Error:', error);
Expand All @@ -46,17 +42,17 @@ export class LinkAdapter implements LinkSyncAdapter {
//const release = await this.generalMutex.acquire();

try {
console.log("Got some live signal from the server");
console.dir(signal);
console.log(this.me);
// console.log("Got some live signal from the server");
// console.dir(signal);
// console.log(this.me);

if (this.myCurrentTime) {
console.log("With current time", this.myCurrentTime);
}
// if (this.myCurrentTime) {
// console.log("With current time", this.myCurrentTime);
// }

let serverRecordTimestamp = signal.serverRecordTimestamp;
if (!this.myCurrentTime|| this.myCurrentTime < serverRecordTimestamp) {
console.log("Returning that live signal to executor");
//console.log("Returning that live signal to executor");
this.myCurrentTime = serverRecordTimestamp;
this.updateServerSyncState();

Expand Down Expand Up @@ -97,8 +93,8 @@ export class LinkAdapter implements LinkSyncAdapter {
if (err) {
console.error("Error in update-sync-state call", err);
};
console.log("Got some result from update-sync-state");
console.dir(signal);
// console.log("Got some result from update-sync-state");
// console.dir(signal);
})
}
}
Expand All @@ -112,15 +108,26 @@ export class LinkAdapter implements LinkSyncAdapter {
}

async others(): Promise<DID[]> {
// @ts-ignore
// return await axiod.get("https://socket.ad4m.dev/getOthers", "GET", {}, {
// linkLanguageUUID: this.languageUid
// })
return [];
const others = await axiod.get("https://socket.ad4m.dev/getOthers", {
params: {
linkLanguageUUID: this.languageUid
}
});
if (others.status === 200) {
//Remove myself from others if it exists
const othersIndex = others.data.indexOf(this.me);
if (othersIndex > -1) {
others.data.splice(othersIndex, 1);
}
return others.data;
} else {
console.error("Error fetching others in linkAdapter, got status", others.status);
return [];
}
}

async currentRevision(): Promise<string> {
console.log("Getting current revision");
//console.log("Getting current revision");
let result;
try {
result = await axiod.post("https://socket.ad4m.dev/currentRevision", {
Expand Down Expand Up @@ -157,7 +164,7 @@ export class LinkAdapter implements LinkSyncAdapter {

//Call sync on the server, which will should fetch all the links we missed since last start of the link language
async sync(): Promise<PerspectiveDiff> {
console.log("Sync call has called sync", this.hasCalledSync);
//console.log("Sync call has called sync", this.hasCalledSync);
//Only allow sync to be called once since once we have sync'd once we will get future links via signal
if (!this.hasCalledSync) {
//console.log("PerspectiveDiffSync.sync(); Getting lock");
Expand All @@ -174,13 +181,13 @@ export class LinkAdapter implements LinkSyncAdapter {
console.error("Error in sync call", err);
throw Error(err);
};
console.log("Got some result from sync");
console.dir(signal);
console.log(this.me);
// console.log("Got some result from sync");
// console.dir(signal);
//console.log(this.me);

if (this.myCurrentTime) {
console.log("With current time", this.myCurrentTime);
}
// if (this.myCurrentTime) {
// console.log("With current time", this.myCurrentTime);
// }

this.myCurrentTime = signal.serverRecordTimestamp;
this.updateServerSyncState();
Expand Down Expand Up @@ -221,34 +228,34 @@ export class LinkAdapter implements LinkSyncAdapter {
}

async commit(diff: PerspectiveDiff): Promise<string> {
//const release = await this.generalMutex.acquire();
try {
const preppedDiff = {
additions: diff.additions.map((item) => prepareLinkExpression(item)),
removals: diff.removals.map((item) => prepareLinkExpression(item)),
linkLanguageUUID: this.languageUid,
did: this.me,
};
console.log("Commit sending prepped diff", preppedDiff);
//const release = await this.generalMutex.acquire();
try {
const preppedDiff = {
additions: diff.additions.map((item) => prepareLinkExpression(item)),
removals: diff.removals.map((item) => prepareLinkExpression(item)),
linkLanguageUUID: this.languageUid,
did: this.me,
};
//console.log("Commit sending prepped diff", preppedDiff);

const signal = await this.emitCommit(preppedDiff);
const signal = await this.emitCommit(preppedDiff);

if (signal.status === "Ok") {
console.log("Got some result from commit");
console.dir(signal);
//Update our local timestamp to match the server
this.myCurrentTime = signal.serverRecordTimestamp;
this.updateServerSyncState();
return ""; // Resolve the function with an empty string
} else {
throw new Error("Commit failed with non-Ok status");
}
} catch (e) {
console.error("PerspectiveDiffSync.commit(); got error", e);
throw e; // Propagate the error up
} finally {
//release();
if (signal.status === "Ok") {
console.log("Got some result from commit");
console.dir(signal);
//Update our local timestamp to match the server
this.myCurrentTime = signal.serverRecordTimestamp;
this.updateServerSyncState();
return ""; // Resolve the function with an empty string
} else {
throw new Error("Commit failed with non-Ok status");
}
} catch (e) {
console.error("PerspectiveDiffSync.commit(); got error", e);
throw e; // Propagate the error up
} finally {
//release();
}
}

// Utility method to wrap the socketClient.emit in a Promise
Expand Down Expand Up @@ -278,23 +285,10 @@ export class LinkAdapter implements LinkSyncAdapter {
async handleSignal(signal: any): Promise<void> {
//This signal only contains link data and no reference, and therefore came from us in a pull in fast_forward_signal
if (this.linkCallback) {
console.log("PerspectiveDiffSync.handleHolochainSignal: calling linkCallback", signal);
//console.log("PerspectiveDiffSync.handleHolochainSignal: calling linkCallback", signal);
await this.linkCallback(signal);
}
}

async addAgentRecord(): Promise<any> {
const others = await this.others();

if (others.filter((other) => other === this.me).length == 0) {
const result = await axiod.post("https://socket.ad4m.dev/addAgent", {
linkLanguageUUID: this.languageUid,
did: this.me
});
console.log("Added agent record with result");
console.dir(result.data);
}
}
}

function prepareLinkExpression(link: LinkExpression): object {
Expand Down
71 changes: 57 additions & 14 deletions bootstrap-languages/centralized-p-diff-sync/telepresenceAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,86 @@
import type { TelepresenceAdapter, OnlineAgent, PerspectiveExpression, DID, TelepresenceSignalCallback, HolochainLanguageDelegate, LanguageContext } from "https://esm.sh/@perspect3vism/[email protected]";;
import axiod from "https://deno.land/x/axiod/mod.ts";
import type { Socket, ServerToClientEvents, ClientToServerEvents } from "https://esm.sh/[email protected]";

export class TelepresenceAdapterImplementation implements TelepresenceAdapter {
me: DID
uuid: string;
hcDna: HolochainLanguageDelegate;
signalCallbacks: TelepresenceSignalCallback[] = [];
socketClient: Socket<ServerToClientEvents, ClientToServerEvents>;

constructor(context: LanguageContext, uuid: string) {
constructor(context: LanguageContext, uuid: string, socketClient: Socket<ServerToClientEvents, ClientToServerEvents>) {
this.hcDna = context.Holochain as HolochainLanguageDelegate;
this.me = context.agent.did;
this.uuid = uuid;
this.socketClient = socketClient;

//Add broadcast signal handler from socket to signalCallbacks
this.socketClient.on("telepresence-signal", (payload: PerspectiveExpression) => {
this.signalCallbacks.forEach(callback => {
callback(payload);
});
});
}

async setOnlineStatus(status: PerspectiveExpression): Promise<void> {
await axiod.post("https://socket.ad4m.dev/setStatus", {
const res = await axiod.post("https://socket.ad4m.dev/setAgentStatus", {
did: this.me,
link: status,
LinkLanguageUUID: this.uuid
})
status: status,
linkLanguageUUID: this.uuid
});
if (res.status === 200) {
console.log("setOnlineStatus: success");
console.log(res.data);
} else {
console.log("setOnlineStatus: failed");
console.log(res.data);
}
return null;
}

async getOnlineAgents(): Promise<OnlineAgent[]> {
const result = await axiod.post("https://socket.ad4m.dev/getOnlineAgents", {
did: this.me,
LinkLanguageUUID: this.uuid
})

// @ts-ignore
return result
const result = await axiod.get("https://socket.ad4m.dev/getOnlineAgents", {
params: {
did: this.me,
linkLanguageUUID: this.uuid
}
});
if (result.status === 200) {
console.log("getOnlineAgents: success");
console.dir(result.data);
return result.data;
} else {
console.log("getOnlineAgents: failed");
console.dir(result.data);
return [];
}
}

async sendSignal(remoteAgentDid: string, payload: PerspectiveExpression): Promise<object> {
//let res = await this.hcDna.call(DNA_NICK, ZOME_NAME, "send_signal", {remote_agent_did: remoteAgentDid, payload});
this.socketClient.emit("send-signal", { remoteAgentDid, linkLanguageUUID: this.uuid, payload }, (err, signal) => {
if (err) {
console.log("sendSignal: failed");
console.dir(err);
} else {
console.log("sendSignal: success");
console.dir(signal);
}
});
return {};
}

async sendBroadcast(payload: PerspectiveExpression): Promise<object> {
//let res = await this.hcDna.call(DNA_NICK, ZOME_NAME, "send_broadcast", payload);
this.socketClient.emit("send-broadcast", { linkLanguageUUID: this.uuid, payload }, (err, signal) => {
if (err) {
console.log("sendBroadcast: failed");
console.dir(err);
} else {
console.log("sendBroadcast: success");
console.dir(signal);
}
});

return {};
}

Expand Down

0 comments on commit 54dad75

Please sign in to comment.