diff --git a/src/AgentCredo.ts b/src/AgentCredo.ts index 1038b49..8d5ad88 100644 --- a/src/AgentCredo.ts +++ b/src/AgentCredo.ts @@ -24,6 +24,12 @@ import { V2CredentialProtocol, V2ProofProtocol, WsOutboundTransport, + BasicMessageRecord, + RecordSavedEvent, + RepositoryEventTypes, + BaseRecord, + BaseEvent, + } from "@credo-ts/core"; import { AnonCredsCredentialFormatService, @@ -52,9 +58,12 @@ import { } from "./lib"; import { IndyVdrPoolConfig } from "@credo-ts/indy-vdr"; import { OutOfBandRecord } from "@credo-ts/core"; -import fs from "node:fs"; -import path from "node:path"; -import { homedir, tmpdir } from "node:os" + +import type { Constructor } from '@credo-ts/core/build/utils/mixins' +import { Subscription, map, filter, pipe } from 'rxjs' + +type BaseRecordAny = BaseRecord +type RecordClass = Constructor & { type: string } const createLinkSecretIfRequired = async (agent: Agent) => { // If we don't have any link secrets yet, we will create a @@ -74,9 +83,9 @@ export const createAgent = async ( logger?: Logger ) => { const agentConfig: InitConfig = { - label: "afj-test", + label: "afj-test-2-z", walletConfig: { - id: "afj-wallet-2", + id: "afj-wallet-2-z", key: "testkey0000000000000000000000000", }, autoUpdateStorageOnStartup: true, @@ -103,7 +112,7 @@ export const createAgent = async ( networks: ledgers as [IndyVdrPoolConfig], }), mediationRecipient: new MediationRecipientModule({ - mediatorInvitationUrl: config.mediatorInvitationUrl, + mediatorInvitationUrl: config.mediatorInvitationUrl ?? "", mediatorPickupStrategy: MediatorPickupStrategy.Implicit, }), connections: new ConnectionsModule({ @@ -170,10 +179,17 @@ export const createAgent = async ( agent.registerOutboundTransport(wsTransport); agent.registerOutboundTransport(httpTransport); - await agent.initialize().then((_) => { - console.log("AFJ Agent initialized"); - }); + + await agent.initialize(); + + console.log("AFJ Agent initialized"); + + if (config.mediatorInvitationUrl) { + await agent.mediationRecipient.initiateMessagePickup(undefined, MediatorPickupStrategy.Implicit) + } + createLinkSecretIfRequired(agent); + const indyVdrPoolService = agent.dependencyManager.resolve(IndyVdrPoolService); await Promise.all( @@ -181,10 +197,13 @@ export const createAgent = async ( (pool as unknown as any).pool.refresh() ) ); + return agent; }; export class AgentCredo implements AriesAgent { + public basicMessageReceivedSubscription?: Subscription; + config: any; ledgers: any[]; public readonly logger: Logger; @@ -216,30 +235,72 @@ export class AgentCredo implements AriesAgent { )[] >; }>; + public constructor(config: any, ledgers: any, logger: Logger) { this.config = config; this.ledgers = ledgers; this.logger = logger; } + + set onBasicMessageReceived(messageRecievedCb: (message: BasicMessageRecord) => void) { + if (messageRecievedCb) { + if (this.basicMessageReceivedSubscription) { + this.basicMessageReceivedSubscription.unsubscribe() + } + + const sub = this.recordsAddedByType(BasicMessageRecord).subscribe(messageRecievedCb) + this.basicMessageReceivedSubscription = sub + + return + } + + this.basicMessageReceivedSubscription?.unsubscribe() + this.basicMessageReceivedSubscription = undefined + } + + private filterByType = (recordClass: RecordClass) => { + return pipe( + map((event: BaseEvent) => (event.payload as Record).record), + filter((record: R) => record.type === recordClass.type), + ) + } + + private recordsAddedByType = (recordClass: RecordClass) => { + if (!this.agent) { + throw new Error('Agent is required to check record type') + } + + if (!recordClass) { + throw new Error("The recordClass can't be undefined") + } + + return this.agent?.events.observable>(RepositoryEventTypes.RecordSaved).pipe(this.filterByType(recordClass)) + } + sendBasicMessage(connection_id: string, content: string): Promise { return this.agent.basicMessages.sendMessage(connection_id, content) } + sendOOBConnectionlessProofRequest( _builder: ProofRequestBuilder ): Promise { throw new Error("Method not implemented."); } + waitForPresentation(_presentation_exchange_id: string): Promise { throw new Error("Method not implemented."); } + sendConnectionlessProofRequest( _builder: ProofRequestBuilder ): Promise { throw new Error("Method not implemented."); } + async acceptCredentialOffer(offer: CredentialOfferRef): Promise { await this.agent.credentials.acceptOffer({ credentialRecordId: offer.id }); } + async acceptProof(proof: AcceptProofArgs): Promise { while (true) { @@ -264,6 +325,7 @@ export class AgentCredo implements AriesAgent { waitFor(1000); } } + async findCredentialOffer(connectionId: string): Promise { let cred!: CredentialExchangeRecord; while (cred === undefined) { @@ -273,17 +335,21 @@ export class AgentCredo implements AriesAgent { } return { id: cred.id, connection_id: cred.connectionId as string }; } + createSchemaCredDefinition( _credDefBuilder: CredentialDefinitionBuilder ): Promise { throw new Error("Method not implemented."); } + createSchema(_builder: SchemaBuilder): Promise { throw new Error("Method not implemented."); } + async createInvitationToConnect(_invitationType: T): Promise> { throw new Error("Method not implemented."); } + async receiveInvitation( inv: ResponseCreateInvitation ): Promise { @@ -305,6 +371,7 @@ export class AgentCredo implements AriesAgent { autoAcceptConnection: true, reuseConnection: true, }); + const invitationRequestsThreadIds = outOfBandRecord.getTags().invitationRequestsThreadIds; @@ -330,11 +397,15 @@ export class AgentCredo implements AriesAgent { connectionRecord: { connection_id: connectionRecord?.id as string }, }; } + public async startup() { this.agent = await createAgent(this.config, this.ledgers, this.logger); } + public async shutdown() { await this.agent?.mediationRecipient?.stopMessagePickup(); await this.agent?.shutdown(); + + this.basicMessageReceivedSubscription?.unsubscribe() } } diff --git a/src/AgentTraction.ts b/src/AgentTraction.ts index e5cc7ef..30d5ae3 100644 --- a/src/AgentTraction.ts +++ b/src/AgentTraction.ts @@ -705,7 +705,8 @@ export class AgentTraction implements AriesAgent { } }) } - async sendBasicMessage(connection_id: string, content: string): Promise{ + async sendBasicMessage(connection_id: string, content: string): Promise { + return await this.axios.post(`/connections/${connection_id}/send-message`,{ content }, { diff --git a/src/credo.test.ts b/src/credo.test.ts index 9d09df1..ad8a4de 100644 --- a/src/credo.test.ts +++ b/src/credo.test.ts @@ -1,23 +1,102 @@ import { describe, expect, test } from "@jest/globals"; -import { createAgent } from "./AgentCredo"; -import { ConsoleLogger, LogLevel } from "@credo-ts/core"; -type AgentType = Awaited> +import { LogLevel, WsOutboundTransport } from "@credo-ts/core"; +import { AgentTraction } from "./AgentTraction"; +import { AgentCredo } from "./AgentCredo"; +import {PinoLogger } from "./lib"; +import pino from "pino"; +import { INVITATION_TYPE } from "./Agent"; -describe.skip("credo", () => { +// type AgentType = Awaited> + +const delay = (ms: number) => { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +const loggerTransport = pino.transport({ + targets: [ + { + level: 'trace', + target: 'pino/file', + options: { + destination: `./logs/run.log.ndjson`, + autoEnd: true, + }, + } + ], +}) + +describe("credo", () => { + const _logger = pino({ level: 'trace', timestamp: pino.stdTimeFunctions.isoTime, }, loggerTransport); + const logger = new PinoLogger(_logger, LogLevel.trace) // eslint-disable-next-line @typescript-eslint/no-var-requires const ledgers = require("../ledgers.json"); - let agent: AgentType; - + // eslint-disable-next-line @typescript-eslint/no-var-requires + const config = require("../local.env.json"); + let holderAgent: AgentCredo; + let issuerAgent: AgentTraction; + beforeAll(async () => { - console.log('1 - beforeAll') - agent = await createAgent({}, ledgers, new ConsoleLogger(LogLevel.trace)); - }, 20000); + console.log('beforeAll') + holderAgent = new AgentCredo(config.holderX, ledgers, logger); + issuerAgent = new AgentTraction(config.issuer, logger); + + await holderAgent.startup() + await issuerAgent.startup() + }, 50000); + afterAll(async () => { - console.log('1 - afterAll') - await agent.mediationRecipient?.stopMessagePickup(); - await agent.shutdown(); + console.log('afterAll') + + await holderAgent.shutdown() + await issuerAgent.shutdown() }, 20000); + test("something", async () => { expect(1 + 2).toBe(3); }, 20000); + + test("receive basic messages", async () => { + const messageCount = 2 + const remoteInvitation = await issuerAgent.createInvitationToConnect(INVITATION_TYPE.CONN_1_0) + const issuerAgentConnectionRef = await holderAgent.receiveInvitation(remoteInvitation) + + logger.info(`waiting for issuer to issuerAgent connection`) + await issuerAgent.waitForConnectionReady(remoteInvitation.payload.connection_id as string) + + logger.info(`${remoteInvitation.payload.connection_id} connected to ${issuerAgentConnectionRef.connectionRecord?.connection_id}`) + + // await holderAgent.shutdown() + + const messageReceivedPromise = new Promise((resolve) => { + let callCount = 0; + holderAgent.onBasicMessageReceived = (message) => { + console.log('basic message received:', message.content); + callCount++; + if (callCount === messageCount) { + resolve(callCount); + } + }; + }); + + await holderAgent.agent.mediationRecipient.stopMessagePickup(); + + logger.info(`waiting for 20 seconds`) + await delay(20000); // Pause for 20 seconds + + // await holderAgent.agent.outboundTransports[0].stop(); + // await holderAgent.agent.mediationRecipient.stopMessagePickup(); + + logger.info(`sending ${messageCount} basic message(s) to ${remoteInvitation.payload.connection_id}`) + + await issuerAgent.sendBasicMessage(remoteInvitation.payload.connection_id as string, 'Hello from issuer 1') + await issuerAgent.sendBasicMessage(remoteInvitation.payload.connection_id as string, 'Hello from issuer 2') + + await delay(5000); + + await holderAgent.agent.mediationRecipient.initiateMessagePickup(); + // await holderAgent.agent.outboundTransports[0].start(holderAgent.agent) + // await holderAgent.agent.mediationRecipient.initiateMessagePickup(); + + expect(await messageReceivedPromise).toBe(messageCount); + }, 35000); });