Skip to content

Commit

Permalink
feat(js-client)!: Multiformat MsgPack for particle data (#422)
Browse files Browse the repository at this point in the history
* feat(particle)!: Multiformat MsgPack for particle data

* Fix types

* fix(ci): use nox with msgpack protocol

* fix(avm): avm 0.59.0

* Fix uint64

* Fix

* fix(ci): enable nox debug logs

* Fix commonJS import

* Revert "fix(ci): enable nox debug logs"

This reverts commit ce5bc2e.

---------

Co-authored-by: Akim Mamedov <[email protected]>
Co-authored-by: folex <[email protected]>
Co-authored-by: Akim <[email protected]>
  • Loading branch information
4 people authored Jan 29, 2024
1 parent fe661db commit 8ac029b
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 24 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ jobs:
uses: fluencelabs/aqua/.github/workflows/tests.yml@main
with:
js-client-snapshots: "${{ needs.js-client.outputs.js-client-snapshots }}"
nox-image: "fluencelabs/nox:unstable"
nox-image: "docker.fluence.dev/nox:feat-VM-407-msgpack-particle"
flox:
needs:
- js-client

uses: fluencelabs/flox/.github/workflows/tests.yml@main
with:
js-client-snapshots: "${{ needs.js-client.outputs.js-client-snapshots }}"
nox-image: "fluencelabs/nox:unstable"
nox-image: "docker.fluence.dev/nox:feat-VM-407-msgpack-particle"
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ jobs:
uses: ./.github/workflows/tests.yml
with:
ref: ${{ github.ref }}
nox-image: "fluencelabs/nox:unstable"
nox-image: "docker.fluence.dev/nox:feat-VM-407-msgpack-particle"
3 changes: 2 additions & 1 deletion packages/core/js-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
"author": "Fluence Labs",
"license": "Apache-2.0",
"dependencies": {
"@libp2p/utils": "5.2.2",
"@chainsafe/libp2p-noise": "14.0.0",
"@chainsafe/libp2p-yamux": "6.0.1",
"@fluencelabs/avm": "0.59.0",
Expand All @@ -44,10 +43,12 @@
"@libp2p/peer-id": "4.0.5",
"@libp2p/peer-id-factory": "4.0.5",
"@libp2p/ping": "1.0.10",
"@libp2p/utils": "5.2.2",
"@libp2p/websockets": "8.0.12",
"@multiformats/multiaddr": "12.1.12",
"bs58": "5.0.0",
"debug": "4.3.4",
"int64-buffer": "1.0.1",
"it-length-prefixed": "9.0.3",
"it-map": "3.0.5",
"it-pipe": "3.0.1",
Expand Down
12 changes: 5 additions & 7 deletions packages/core/js-client/src/connection/RelayConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ import map from "it-map";
import { pipe } from "it-pipe";
import { createLibp2p, Libp2p } from "libp2p";
import { Subject } from "rxjs";
import { fromString } from "uint8arrays/from-string";
import { toString } from "uint8arrays/to-string";

import { KeyPair } from "../keypair/index.js";
import { IParticle } from "../particle/interfaces.js";
import {
buildParticleMessage,
Particle,
serializeToString,
serializeParticle,
} from "../particle/Particle.js";
import { throwHasNoPeerId } from "../util/libp2pUtils.js";
import { logger } from "../util/logger.js";
Expand Down Expand Up @@ -216,7 +214,7 @@ export class RelayConnection implements IConnection {

const sink = stream.sink;

await pipe([fromString(serializeToString(particle))], encode, sink);
await pipe([serializeParticle(particle)], encode, sink);

log.trace(
"particle %s sent to %s",
Expand All @@ -225,11 +223,11 @@ export class RelayConnection implements IConnection {
);
}

private async processIncomingMessage(msg: string) {
private async processIncomingMessage(msg: Uint8Array) {
let particle: Particle | undefined;

try {
particle = Particle.fromString(msg);
particle = Particle.deserialize(msg);

log.trace(
"received particle %s from %s",
Expand Down Expand Up @@ -290,7 +288,7 @@ export class RelayConnection implements IConnection {
decode,
(source) => {
return map(source, (buf) => {
return toString(buf.subarray());
return buf.subarray();
});
},
async (source) => {
Expand Down
32 changes: 19 additions & 13 deletions packages/core/js-client/src/particle/Particle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
* limitations under the License.
*/

import { CallResultsArray } from "@fluencelabs/avm";
import {
CallResultsArray,
MulticodecRepr,
MsgPackRepr,
} from "@fluencelabs/avm";
import { JSONValue } from "@fluencelabs/interfaces";
import { fromUint8Array, toUint8Array } from "js-base64";
import int64Buffer from "int64-buffer";
import { concat } from "uint8arrays/concat";
import { v4 as uuidv4 } from "uuid";
import { z } from "zod";
Expand All @@ -27,14 +31,16 @@ import { numberToLittleEndianBytes } from "../util/bytes.js";

import { IParticle } from "./interfaces.js";

const particleRepr = new MulticodecRepr(new MsgPackRepr());

const particleSchema = z.object({
id: z.string(),
timestamp: z.number().positive(),
script: z.string(),
data: z.string(),
data: z.instanceof(Uint8Array),
ttl: z.number().positive(),
init_peer_id: z.string(),
signature: z.array(z.number()),
signature: z.instanceof(Uint8Array),
});

export class Particle implements IParticle {
Expand Down Expand Up @@ -73,10 +79,10 @@ export class Particle implements IParticle {
);
}

static fromString(str: string): Particle {
const json = JSON.parse(str);
static deserialize(bytes: Uint8Array): Particle {
const obj = particleRepr.fromBinary(bytes);

const res = particleSchema.safeParse(json);
const res = particleSchema.safeParse(obj);

if (!res.success) {
throw new Error(
Expand All @@ -92,10 +98,10 @@ export class Particle implements IParticle {
data.id,
data.timestamp,
data.script,
toUint8Array(data.data),
data.data,
data.ttl,
data.init_peer_id,
new Uint8Array(data.signature),
data.signature,
);
}
}
Expand Down Expand Up @@ -154,16 +160,16 @@ export const cloneWithNewData = (
/**
* Serializes particle into string suitable for sending through network
*/
export const serializeToString = (particle: IParticle): string => {
return JSON.stringify({
export const serializeParticle = (particle: IParticle): Uint8Array => {
return particleRepr.toBinary({
action: "Particle",
id: particle.id,
init_peer_id: particle.initPeerId,
timestamp: particle.timestamp,
timestamp: new int64Buffer.Uint64BE(particle.timestamp),
ttl: particle.ttl,
script: particle.script,
signature: Array.from(particle.signature),
data: fromUint8Array(particle.data),
data: Array.from(particle.data),
});
};

Expand Down
12 changes: 12 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8ac029b

Please sign in to comment.