Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bridge experiments #1192

Merged
merged 21 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"--forceExit",
"--runInBand",
"--testTimeout=120000",
"packages/modules/packages/archivist/packages/indexeddb/src/spec/Archivist.spec.ts"
"packages/modules/packages/bridge/packages/http/src/spec/HttpBridge.caching.spec.ts"
],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
Expand Down
7 changes: 7 additions & 0 deletions packages/modules/packages/bridge/packages/http/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@
"@xylabs/ts-scripts-yarn3": "^3.2.42",
"@xylabs/tsconfig": "^3.2.42",
"@xyo-network/account": "workspace:~",
"@xyo-network/archivist-memory": "workspace:~",
"@xyo-network/archivist-model": "workspace:~",
"@xyo-network/boundwitness-builder": "workspace:~",
"@xyo-network/diviner-boundwitness-memory": "workspace:~",
"@xyo-network/diviner-boundwitness-model": "workspace:~",
"@xyo-network/diviner-model": "workspace:~",
"@xyo-network/hash": "workspace:~",
"@xyo-network/module-abstract": "workspace:~",
"@xyo-network/node-memory": "workspace:~",
"@xyo-network/payload-wrapper": "workspace:~",
"typescript": "^5.3.3"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import { assertEx } from '@xylabs/assert'
import { Account } from '@xyo-network/account'
import { MemoryArchivist } from '@xyo-network/archivist-memory'
import { ArchivistInsertQuery, ArchivistInsertQuerySchema, ArchivistInstance, asArchivistInstance } from '@xyo-network/archivist-model'
import { QueryBoundWitnessBuilder } from '@xyo-network/boundwitness-builder'
import { isBoundWitness, isQueryBoundWitness } from '@xyo-network/boundwitness-model'
import { MemoryBoundWitnessDiviner } from '@xyo-network/diviner-boundwitness-memory'
import { BoundWitnessDivinerQuerySchema } from '@xyo-network/diviner-boundwitness-model'
import { DivinerInstance } from '@xyo-network/diviner-model'
import { PayloadHasher } from '@xyo-network/hash'
import { AbstractModule } from '@xyo-network/module-abstract'
import { ModuleQueryResult } from '@xyo-network/module-model'
import { MemoryNode } from '@xyo-network/node-memory'
import { Payload, QueryFields } from '@xyo-network/payload-model'
import { PayloadWrapper } from '@xyo-network/payload-wrapper'

interface IntermediateNode {
commandArchivist: ArchivistInstance
commandArchivistBoundWitnessDiviner: DivinerInstance
queryResponseArchivist: ArchivistInstance
queryResponseArchivistBoundWitnessDiviner: DivinerInstance
}

interface BridgeClient {
// bridge: BridgeInstance
bridgeQueryResponseArchivist: ArchivistInstance
commandStateStoreArchivist: ArchivistInstance
module: AbstractModule
}

/**
* @group module
* @group bridge
*/

describe.skip('HttpBridge.caching', () => {
let intermediateNode: IntermediateNode
let clients: BridgeClient[]
const payload = PayloadWrapper.parse({ salt: Date.now(), schema: 'network.xyo.test' })?.jsonPayload() as Payload
let sourceQueryHash: string
let response: ModuleQueryResult
beforeAll(async () => {
const intermediateNodeAccount = await Account.create()
const node = await MemoryNode.create({ account: intermediateNodeAccount })

const commandArchivistAccount = await Account.create()
const commandArchivist = await MemoryArchivist.create({
account: commandArchivistAccount,
config: { schema: MemoryArchivist.configSchema },
})

const commandArchivistBoundWitnessDivinerAccount = await Account.create()
const commandArchivistBoundWitnessDiviner = await MemoryBoundWitnessDiviner.create({
account: commandArchivistBoundWitnessDivinerAccount,
config: { archivist: commandArchivist.address, schema: MemoryBoundWitnessDiviner.configSchema },
})

const queryResponseArchivistAccount = await Account.create()
const queryResponseArchivist = await MemoryArchivist.create({
account: queryResponseArchivistAccount,
config: { schema: MemoryArchivist.configSchema },
})

const queryResponseArchivistBoundWitnessDivinerAccount = await Account.create()
const queryResponseArchivistBoundWitnessDiviner = await MemoryBoundWitnessDiviner.create({
account: queryResponseArchivistBoundWitnessDivinerAccount,
config: { archivist: queryResponseArchivist.address, schema: MemoryBoundWitnessDiviner.configSchema },
})

intermediateNode = {
commandArchivist,
commandArchivistBoundWitnessDiviner,
queryResponseArchivist,
queryResponseArchivistBoundWitnessDiviner,
}

for (const mod of Object.values(intermediateNode)) {
await node.register(mod)
await node.attach(mod.address, true)
}

clients = await Promise.all(
['A', 'B'].map(async () => {
const clientNodeAccount = await Account.create()
const clientNode = await MemoryNode.create({ account: clientNodeAccount })

const commandStateStoreArchivistAccount = await Account.create()
const commandStateStoreArchivist = await MemoryArchivist.create({
account: commandStateStoreArchivistAccount,
config: { schema: MemoryArchivist.configSchema },
})

const bridgeQueryResponseArchivistAccount = await Account.create()
const bridgeQueryResponseArchivist = await MemoryArchivist.create({
account: bridgeQueryResponseArchivistAccount,
config: { schema: MemoryArchivist.configSchema },
})

const moduleAccount = await Account.create()
const module = await MemoryArchivist.create({
account: moduleAccount,
config: { schema: MemoryArchivist.configSchema },
})

const cachingBridge: BridgeClient = { bridgeQueryResponseArchivist, commandStateStoreArchivist, module }
for (const mod of Object.values(cachingBridge)) {
await clientNode.register(mod)
await clientNode.attach(mod.address, true)
}
return cachingBridge
}),
)
})

it('Module A issues command', async () => {
const source = clients[0]
const destination = clients[1]
const query: ArchivistInsertQuery = { address: destination.module.account.address, schema: ArchivistInsertQuerySchema }
const [command, payloads] = await new QueryBoundWitnessBuilder().witness(source.module.account).query(query).payloads([payload]).build()
sourceQueryHash = await PayloadHasher.hashAsync(command)
const insertResult = await intermediateNode.commandArchivist.insert([command, ...payloads])
expect(insertResult).toBeDefined()
expect(insertResult).toBeArrayOfSize(1 + payloads.length)
})
it('Module B receives command', async () => {
const destination = clients[1].module
const { commandArchivist, commandArchivistBoundWitnessDiviner } = intermediateNode
// TODO: Retrieve offset from state store
const offset = 0
// TODO: Filter for commands to us by address
const query = { limit: 1, offset, payload_schemas: [ArchivistInsertQuerySchema], schema: BoundWitnessDivinerQuerySchema, sort: 'asc' }
const commands = await commandArchivistBoundWitnessDiviner.divine([query])
expect(commands).toBeArray()
expect(commands.length).toBeGreaterThan(0)
for (const command of commands.filter(isQueryBoundWitness)) {
const commandPayloads = await PayloadHasher.toMap(await commandArchivist.get(command.payload_hashes))
const query = commandPayloads?.[command.query] as Payload<QueryFields>
if (query && query?.address === destination.address && destination.queries.includes(query.schema)) {
// Issue query against module
response = await destination.query(command, Object.values(commandPayloads))
expect(response).toBeDefined()
}
}
const archivist = asArchivistInstance(destination, 'Failed to cast archivist')
expect(archivist?.all).toBeFunction()
const all = await archivist.all?.()
expect(all).toBeArrayOfSize(1)
expect(all?.[0]).toEqual(payload)
})
it('Module B issues response', async () => {
const { queryResponseArchivist } = intermediateNode
const [bw, payloads, errors] = response
const insertResult = await queryResponseArchivist.insert([bw, ...payloads, ...errors])
expect(insertResult).toBeDefined()
expect(insertResult).toBeArrayOfSize(1 + payloads.length)
})
it('Module A receives response', async () => {
const destination = clients[1]
const { queryResponseArchivist, queryResponseArchivistBoundWitnessDiviner } = intermediateNode
// Attach event handler to archivist insert
const done = new Promise((resolve, reject) => {
destination.bridgeQueryResponseArchivist.on('inserted', async (insertResult) => {
await Promise.resolve()
const bw = insertResult.payloads.find(isBoundWitness)
if (bw) {
// Filter specifically for the sourceQuery for the hash we issued
if (bw?.sourceQuery === sourceQueryHash) {
const payloads = insertResult.payloads.filter((payload) => payload !== bw)
const rematerializedResponse: ModuleQueryResult = [bw, payloads, []]
resolve(rematerializedResponse)
}
} else {
reject()
}
})
})
// TODO: Retrieve offset from state store
const offset = 0
// Filter BWs specifically for the sourceQuery for the hash we issued to the address we issued it to
const addresses = [destination.module.address]
const query = { addresses, limit: 1, offset, schema: BoundWitnessDivinerQuerySchema, sort: 'asc', sourceQuery: sourceQueryHash }
const queryResponseResults = await queryResponseArchivistBoundWitnessDiviner.divine([query])
expect(queryResponseResults).toBeArray()
expect(queryResponseResults.length).toBe(1)
// TODO: Identity function for isQueryBoundWitnessResponse/ModuleQueryResult
const queryResponseResult = queryResponseResults.find(isBoundWitness)
expect(queryResponseResult).toBeDefined()
const queryResponsePayloadHashes = assertEx(queryResponseResult, 'Failed to get queryResponseHash').payload_hashes
const queryResponsePayloads = await queryResponseArchivist.get(queryResponsePayloadHashes)
expect(queryResponsePayloads).toBeArrayOfSize(1)
// Insert into bridgeQueryResponseArchivist
await destination.bridgeQueryResponseArchivist.insert([assertEx(queryResponseResult), ...queryResponsePayloads])
return done
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ export class MemoryBoundWitnessDiviner<TParams extends BoundWitnessDivinerParams
const filter = assertEx(payloads?.filter(isBoundWitnessDivinerQueryPayload)?.pop(), 'Missing query payload')
if (!filter) return []
const archivist = assertEx(await this.getArchivist(), 'Unable to resolve archivist')
const { addresses, payload_hashes, payload_schemas, limit, offset, order } = filter
const { addresses, payload_hashes, payload_schemas, limit, offset, order, sourceQuery } = filter
let bws = ((await archivist?.all?.()) ?? []).filter(isBoundWitness)
if (order === 'desc') bws = bws.reverse()
const allAddresses = addresses?.map((address) => hexFromHexString(address)).filter(exists)
if (allAddresses?.length) bws = bws.filter((bw) => containsAll(bw.addresses, allAddresses))
if (payload_hashes?.length) bws = bws.filter((bw) => containsAll(bw.payload_hashes, payload_hashes))
if (payload_schemas?.length) bws = bws.filter((bw) => containsAll(bw.payload_schemas, payload_schemas))
if (sourceQuery) bws = bws.filter((bw) => bw.sourceQuery === sourceQuery)
const parsedLimit = limit ?? bws.length
const parsedOffset = offset ?? 0
return bws.slice(parsedOffset, parsedLimit)
Expand Down
7 changes: 7 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4746,11 +4746,18 @@ __metadata:
"@xyo-network/abstract-bridge": "workspace:~"
"@xyo-network/account": "workspace:~"
"@xyo-network/api-models": "workspace:~"
"@xyo-network/archivist-memory": "workspace:~"
"@xyo-network/archivist-model": "workspace:~"
"@xyo-network/boundwitness-builder": "workspace:~"
"@xyo-network/boundwitness-model": "workspace:~"
"@xyo-network/bridge-model": "workspace:~"
"@xyo-network/config-payload-plugin": "workspace:~"
"@xyo-network/diviner-boundwitness-memory": "workspace:~"
"@xyo-network/diviner-boundwitness-model": "workspace:~"
"@xyo-network/diviner-model": "workspace:~"
"@xyo-network/hash": "workspace:~"
"@xyo-network/manifest-model": "workspace:~"
"@xyo-network/module-abstract": "workspace:~"
"@xyo-network/module-model": "workspace:~"
"@xyo-network/node-memory": "workspace:~"
"@xyo-network/node-model": "workspace:~"
Expand Down