Skip to content

Commit

Permalink
feat: test credo messaging (#3)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason C. Leach <[email protected]>
  • Loading branch information
jleach authored Nov 28, 2024
1 parent 1132865 commit 6f38e4d
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 22 deletions.
89 changes: 80 additions & 9 deletions src/AgentCredo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import {
V2CredentialProtocol,
V2ProofProtocol,
WsOutboundTransport,
BasicMessageRecord,
RecordSavedEvent,
RepositoryEventTypes,
BaseRecord,
BaseEvent,

} from "@credo-ts/core";
import {
AnonCredsCredentialFormatService,
Expand Down Expand Up @@ -52,9 +58,12 @@ import {
} from "./lib";
import { IndyVdrPoolConfig } from "@credo-ts/indy-vdr";
import { OutOfBandRecord } from "@credo-ts/core";
import fs from "node:fs";
import path from "node:path";
import { homedir, tmpdir } from "node:os"

import type { Constructor } from '@credo-ts/core/build/utils/mixins'
import { Subscription, map, filter, pipe } from 'rxjs'

type BaseRecordAny = BaseRecord<any, any, any>
type RecordClass<R extends BaseRecordAny> = Constructor<R> & { type: string }

const createLinkSecretIfRequired = async (agent: Agent) => {
// If we don't have any link secrets yet, we will create a
Expand All @@ -74,9 +83,9 @@ export const createAgent = async (
logger?: Logger
) => {
const agentConfig: InitConfig = {
label: "afj-test",
label: "afj-test-2-z",
walletConfig: {
id: "afj-wallet-2",
id: "afj-wallet-2-z",
key: "testkey0000000000000000000000000",
},
autoUpdateStorageOnStartup: true,
Expand All @@ -103,7 +112,7 @@ export const createAgent = async (
networks: ledgers as [IndyVdrPoolConfig],
}),
mediationRecipient: new MediationRecipientModule({
mediatorInvitationUrl: config.mediatorInvitationUrl,
mediatorInvitationUrl: config.mediatorInvitationUrl ?? "",
mediatorPickupStrategy: MediatorPickupStrategy.Implicit,
}),
connections: new ConnectionsModule({
Expand Down Expand Up @@ -170,21 +179,31 @@ export const createAgent = async (

agent.registerOutboundTransport(wsTransport);
agent.registerOutboundTransport(httpTransport);
await agent.initialize().then((_) => {
console.log("AFJ Agent initialized");
});

await agent.initialize();

console.log("AFJ Agent initialized");

if (config.mediatorInvitationUrl) {
await agent.mediationRecipient.initiateMessagePickup(undefined, MediatorPickupStrategy.Implicit)
}

createLinkSecretIfRequired(agent);

const indyVdrPoolService =
agent.dependencyManager.resolve(IndyVdrPoolService);
await Promise.all(
indyVdrPoolService.pools.map((pool) =>
(pool as unknown as any).pool.refresh()
)
);

return agent;
};

export class AgentCredo implements AriesAgent {
public basicMessageReceivedSubscription?: Subscription;

config: any;
ledgers: any[];
public readonly logger: Logger;
Expand Down Expand Up @@ -216,30 +235,72 @@ export class AgentCredo implements AriesAgent {
)[]
>;
}>;

public constructor(config: any, ledgers: any, logger: Logger) {
this.config = config;
this.ledgers = ledgers;
this.logger = logger;
}

set onBasicMessageReceived(messageRecievedCb: (message: BasicMessageRecord) => void) {
if (messageRecievedCb) {
if (this.basicMessageReceivedSubscription) {
this.basicMessageReceivedSubscription.unsubscribe()
}

const sub = this.recordsAddedByType(BasicMessageRecord).subscribe(messageRecievedCb)
this.basicMessageReceivedSubscription = sub

return
}

this.basicMessageReceivedSubscription?.unsubscribe()
this.basicMessageReceivedSubscription = undefined
}

private filterByType = <R extends BaseRecordAny>(recordClass: RecordClass<R>) => {
return pipe(
map((event: BaseEvent) => (event.payload as Record<string, R>).record),
filter((record: R) => record.type === recordClass.type),
)
}

private recordsAddedByType = <R extends BaseRecordAny>(recordClass: RecordClass<R>) => {
if (!this.agent) {
throw new Error('Agent is required to check record type')
}

if (!recordClass) {
throw new Error("The recordClass can't be undefined")
}

return this.agent?.events.observable<RecordSavedEvent<R>>(RepositoryEventTypes.RecordSaved).pipe(this.filterByType(recordClass))
}

sendBasicMessage(connection_id: string, content: string): Promise<any> {
return this.agent.basicMessages.sendMessage(connection_id, content)
}

sendOOBConnectionlessProofRequest(
_builder: ProofRequestBuilder
): Promise<any | undefined> {
throw new Error("Method not implemented.");
}

waitForPresentation(_presentation_exchange_id: string): Promise<void> {
throw new Error("Method not implemented.");
}

sendConnectionlessProofRequest(
_builder: ProofRequestBuilder
): Promise<any | undefined> {
throw new Error("Method not implemented.");
}

async acceptCredentialOffer(offer: CredentialOfferRef): Promise<void> {
await this.agent.credentials.acceptOffer({ credentialRecordId: offer.id });
}

async acceptProof(proof: AcceptProofArgs): Promise<void> {

while (true) {
Expand All @@ -264,6 +325,7 @@ export class AgentCredo implements AriesAgent {
waitFor(1000);
}
}

async findCredentialOffer(connectionId: string): Promise<CredentialOfferRef> {
let cred!: CredentialExchangeRecord;
while (cred === undefined) {
Expand All @@ -273,17 +335,21 @@ export class AgentCredo implements AriesAgent {
}
return { id: cred.id, connection_id: cred.connectionId as string };
}

createSchemaCredDefinition(
_credDefBuilder: CredentialDefinitionBuilder
): Promise<string | undefined> {
throw new Error("Method not implemented.");
}

createSchema(_builder: SchemaBuilder): Promise<string | undefined> {
throw new Error("Method not implemented.");
}

async createInvitationToConnect<T extends INVITATION_TYPE>(_invitationType: T): Promise<CreateInvitationResponse<T>> {
throw new Error("Method not implemented.");
}

async receiveInvitation(
inv: ResponseCreateInvitation
): Promise<ReceiveInvitationResponse> {
Expand All @@ -305,6 +371,7 @@ export class AgentCredo implements AriesAgent {
autoAcceptConnection: true,
reuseConnection: true,
});

const invitationRequestsThreadIds =
outOfBandRecord.getTags().invitationRequestsThreadIds;

Expand All @@ -330,11 +397,15 @@ export class AgentCredo implements AriesAgent {
connectionRecord: { connection_id: connectionRecord?.id as string },
};
}

public async startup() {
this.agent = await createAgent(this.config, this.ledgers, this.logger);
}

public async shutdown() {
await this.agent?.mediationRecipient?.stopMessagePickup();
await this.agent?.shutdown();

this.basicMessageReceivedSubscription?.unsubscribe()
}
}
3 changes: 2 additions & 1 deletion src/AgentTraction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,8 @@ export class AgentTraction implements AriesAgent {
}
})
}
async sendBasicMessage(connection_id: string, content: string): Promise<any>{
async sendBasicMessage(connection_id: string, content: string): Promise<any> {

return await this.axios.post(`/connections/${connection_id}/send-message`,{
content
}, {
Expand Down
103 changes: 91 additions & 12 deletions src/credo.test.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,102 @@
import { describe, expect, test } from "@jest/globals";
import { createAgent } from "./AgentCredo";
import { ConsoleLogger, LogLevel } from "@credo-ts/core";
type AgentType = Awaited<ReturnType<typeof createAgent>>
import { LogLevel, WsOutboundTransport } from "@credo-ts/core";
import { AgentTraction } from "./AgentTraction";
import { AgentCredo } from "./AgentCredo";
import {PinoLogger } from "./lib";
import pino from "pino";
import { INVITATION_TYPE } from "./Agent";

describe.skip("credo", () => {
// type AgentType = Awaited<ReturnType<typeof createAgent>>

const delay = (ms: number) => {
return new Promise(resolve => setTimeout(resolve, ms));
}

const loggerTransport = pino.transport({
targets: [
{
level: 'trace',
target: 'pino/file',
options: {
destination: `./logs/run.log.ndjson`,
autoEnd: true,
},
}
],
})

describe("credo", () => {
const _logger = pino({ level: 'trace', timestamp: pino.stdTimeFunctions.isoTime, }, loggerTransport);
const logger = new PinoLogger(_logger, LogLevel.trace)
// eslint-disable-next-line @typescript-eslint/no-var-requires
const ledgers = require("../ledgers.json");
let agent: AgentType;

// eslint-disable-next-line @typescript-eslint/no-var-requires
const config = require("../local.env.json");
let holderAgent: AgentCredo;
let issuerAgent: AgentTraction;

beforeAll(async () => {
console.log('1 - beforeAll')
agent = await createAgent({}, ledgers, new ConsoleLogger(LogLevel.trace));
}, 20000);
console.log('beforeAll')
holderAgent = new AgentCredo(config.holderX, ledgers, logger);
issuerAgent = new AgentTraction(config.issuer, logger);

await holderAgent.startup()
await issuerAgent.startup()
}, 50000);

afterAll(async () => {
console.log('1 - afterAll')
await agent.mediationRecipient?.stopMessagePickup();
await agent.shutdown();
console.log('afterAll')

await holderAgent.shutdown()
await issuerAgent.shutdown()
}, 20000);

test("something", async () => {
expect(1 + 2).toBe(3);
}, 20000);

test("receive basic messages", async () => {
const messageCount = 2
const remoteInvitation = await issuerAgent.createInvitationToConnect(INVITATION_TYPE.CONN_1_0)
const issuerAgentConnectionRef = await holderAgent.receiveInvitation(remoteInvitation)

logger.info(`waiting for issuer to issuerAgent connection`)
await issuerAgent.waitForConnectionReady(remoteInvitation.payload.connection_id as string)

logger.info(`${remoteInvitation.payload.connection_id} connected to ${issuerAgentConnectionRef.connectionRecord?.connection_id}`)

// await holderAgent.shutdown()

const messageReceivedPromise = new Promise<number>((resolve) => {
let callCount = 0;
holderAgent.onBasicMessageReceived = (message) => {
console.log('basic message received:', message.content);
callCount++;
if (callCount === messageCount) {
resolve(callCount);
}
};
});

await holderAgent.agent.mediationRecipient.stopMessagePickup();

logger.info(`waiting for 20 seconds`)
await delay(20000); // Pause for 20 seconds

// await holderAgent.agent.outboundTransports[0].stop();
// await holderAgent.agent.mediationRecipient.stopMessagePickup();

logger.info(`sending ${messageCount} basic message(s) to ${remoteInvitation.payload.connection_id}`)

await issuerAgent.sendBasicMessage(remoteInvitation.payload.connection_id as string, 'Hello from issuer 1')
await issuerAgent.sendBasicMessage(remoteInvitation.payload.connection_id as string, 'Hello from issuer 2')

await delay(5000);

await holderAgent.agent.mediationRecipient.initiateMessagePickup();
// await holderAgent.agent.outboundTransports[0].start(holderAgent.agent)
// await holderAgent.agent.mediationRecipient.initiateMessagePickup();

expect(await messageReceivedPromise).toBe(messageCount);
}, 35000);
});

0 comments on commit 6f38e4d

Please sign in to comment.