Skip to content

Commit

Permalink
added mongodb, ws send event
Browse files Browse the repository at this point in the history
  • Loading branch information
viapip committed Mar 14, 2024
1 parent 347adb5 commit 3fe51bc
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 60 deletions.
3 changes: 2 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@
"yatki.vscode-surround",
"yoavbls.pretty-ts-errors",
"Yseop.vscode-yseopml",
"zardoy.ts-essential-plugins"
"zardoy.ts-essential-plugins",
"mongodb.mongodb-vscode"
]
}
}
Expand Down
12 changes: 12 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,17 @@ services:
- redis-data:/data
command: dragonfly --default_lua_flags=allow-undeclared-keys

mongodb:
image: mongo:7.0.6
restart: unless-stopped
ports:
- '27017:27017'
volumes:
- mongodb-data:/data/db
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example

volumes:
redis-data:
mongodb-data:
2 changes: 2 additions & 0 deletions lib/ajv/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export async function createAjv() {

ajv.addSchema(schemas)
ajv.addSchema(userSchema, 'User')
// const test = ajv.getSchema('User')
// logger.info(test?.schemaEnv)

function validateSchema(schemaId: string, data: unknown) {
const valid = ajv.validate(schemaId, data)
Expand Down
43 changes: 21 additions & 22 deletions lib/guard/index.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
import * as jose from 'jose'
import path from 'node:path'

const alg = 'ES256'
// const options = {
// issuer: "urn:example:issuer",
// audience: "urn:example:audience",
// };
const keys = await jose.generateKeyPair(alg, { extractable: true })
const keys2 = await jose.generateKeyPair(alg, { extractable: true })
// const keys3 = await jose.generateKeyPair(alg, { extractable: true });
export const alg = 'ES256'
const keysDir = path.join(__dirname, './keys')

const jwk1 = await jose.exportJWK(keys.publicKey).then((jwk) => {
jwk.kid = 'key1'
export const publicKeyPath = path.join(keysDir, 'public')
export const privateKeyPath = path.join(keysDir, 'private')
export function loadKeys() {

return jwk
})
}

const jwk2 = await jose.exportJWK(keys2.publicKey).then((jwk) => {
jwk.kid = 'key2'
// export async function writeKeys(obj: {
// name: string
// privateKey: jose.KeyLike
// publicKey: jose.KeyLike
// }) {

return jwk
})
// }

// const jwk3 = await jose.exportJWK(keys3.publicKey).then((jwk) => {
// jwk.kid = "key3";
// return jwk;
// });
// export async function generateKeyPair(name: string) {
// const { privateKey, publicKey } = await jose.generateKeyPair(alg, { extractable: true })

export const jwks = jose.createLocalJWKSet({ keys: [jwk1, jwk2] })
// // writeKeys({
// // name,
// // privateKey,
// // publicKey,
// // })
// }
22 changes: 22 additions & 0 deletions lib/guard/test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
function generateCombinations(test: { [key: string]: string[] }): string[][] {
let combinations: string[][] = [[]]
Object.keys(test).forEach((key) => {
const newCombinations: string[][] = []
test[key].forEach((value) => {
combinations.forEach((combination) => {
newCombinations.push([...combination, `${key}-${value}`])
})
})
combinations = newCombinations
})

return combinations
}

const test = {
size: ['xs', 's', 'm', 'l', 'xl'],
color: ['red', 'green', 'blue'],
material: ['wood', 'plastic', 'metal'],
}

console.log(generateCombinations(test))
16 changes: 16 additions & 0 deletions lib/mongo/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import consola from 'consola'
import { MongoClient } from 'mongodb'

const logger = consola.withTag('mongodb')
export async function createMongoDBStore() {
const client = new MongoClient('mongodb://root:example@mongodb:27017')

await client.connect()

const db = client.db('testSozdev')
logger.debug('Connected to MongoDB', db.databaseName)

return {
data: db.collection('data'),
}
}
6 changes: 3 additions & 3 deletions lib/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ export async function createRedisStore() {
return items.flat() as T[]
}

const insertOne = async (id: string, data: T) => {
const pattern = formatPattern(id)
const item = { ...data, id }
const insertOne = async (_id: string, data: T) => {
const pattern = formatPattern(_id)
const item = { ...data, _id }
await connection.json.set(pattern, '$', item)

return item as T
Expand Down
2 changes: 1 addition & 1 deletion lib/transformer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { decode, encode } from '@msgpack/msgpack'
import consola from 'consola'

const logger = consola.withTag('server/trpc')
const logger = consola.withTag('server/trpc/transformer')

function uint8ArrayToString(arr: Uint8Array) {
return Array.from(arr)
Expand Down
32 changes: 16 additions & 16 deletions lib/ws/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ import type { ClientOptions } from 'ws'
const logger = consola.withTag('client/ws')

type BufferLike =
| string
| Buffer
| DataView
| number
| ArrayBufferView
| Uint8Array
| ArrayBuffer
| SharedArrayBuffer
| readonly any[]
| readonly number[]
| { valueOf(): ArrayBuffer }
| { valueOf(): SharedArrayBuffer }
| { valueOf(): Uint8Array }
| { valueOf(): readonly number[] }
| { valueOf(): string }
| { [Symbol.toPrimitive](hint: string): string }
| string
| Buffer
| DataView
| number
| ArrayBufferView
| Uint8Array
| ArrayBuffer
| SharedArrayBuffer
| readonly any[]
| readonly number[]
| { valueOf(): ArrayBuffer }
| { valueOf(): SharedArrayBuffer }
| { valueOf(): Uint8Array }
| { valueOf(): readonly number[] }
| { valueOf(): string }
| { [Symbol.toPrimitive](hint: string): string }

export class WebSocketProxy extends WebSocket {
constructor(
Expand Down
27 changes: 19 additions & 8 deletions lib/ws/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,20 @@ export class WebSocketProxy<
return this.getProxy(this)
}

private getWrappedMethod<S>(target: S, prop: string) {
private getWrappedMethod<S extends WebSocketServer | WebSocket>(target: S, prop: string) {
const methods = {
on: this.customOn.bind(target),
send: this.customSend.bind(target),
emit: this.customSend.bind(target),
} as Record<string, (...args: any[]) => void>

return methods[prop]
}

private getProxy<S extends object>(t: S) {
private getProxy<S extends WebSocketServer | WebSocket>(t: S) {
return new Proxy(t, {
get: (target, prop, receiver) => {
if (prop === 'on') {
if (prop === 'on' || prop === 'send') {
return this.getWrappedMethod(target, prop)
}

Expand All @@ -71,18 +72,21 @@ export class WebSocketProxy<
listener: (this: WebSocketServer, ...args: any[]) => void,
) {
this.on(event, async (...args: any[]) => {
logger.info('Received', event)
if (event === 'connection') {
const arg = args as [socket: InstanceType<T>, request: InstanceType<U>]
const ws = this.getProxy(arg[0])
arg[0] = ws

listener.call(this, ...arg)

return
}

if (event === 'message') {
await sleep(100)
logger.info('Receiving', event, JSON.parse(args[0] as string))
logger.debug('Receiving', event, JSON.parse(args[0] as string))
// logger.debug('Receiving', event, JSON.parse(args[0] as string))
const [data, isBinary] = args as [BufferLike, boolean]
listener.call(this, data, isBinary)

Expand All @@ -93,10 +97,17 @@ export class WebSocketProxy<
})
}

private async customSend(event: string | symbol, data: BufferLike) {
private async customSend(
this: WebSocket | WebSocketServer,
data: BufferLike,
cb?: (error?: Error) => void,
) {
await sleep(100)
logger.info('Sending', event)
logger.debug('Sending', event)
this.emit(event, data)

logger.debug('Sending', data)

if ('send' in this) {
return this.send(data, cb)
}
}
}
8 changes: 4 additions & 4 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ while (true) {
})

if (typeof name !== 'string') {
console.log('Invalid name')
logger.info('Invalid name')
subscription.unsubscribe()
break
}

const { id } = await client.data.postItem.mutate({
const { _id } = await client.data.postItem.mutate({
id: `${Math.floor(Math.random() * 1000)}`,
schemaId: 'User',
data: {
Expand All @@ -80,9 +80,9 @@ while (true) {
email: `${name}@example.com`,
},
},
}) as { id: string }
}) as { _id: string }

const user = await client.data.getItem.query(id)
const user = await client.data.getItem.query(_id)
if (user) {
users.push(user)
logger.info('User', user)
Expand Down
3 changes: 3 additions & 0 deletions src/server/context.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createAjv } from '@/ajv'
import { bullmq } from '@/bullmq'
import { createMongoDBStore } from '@/mongo'
import { createRedisStore } from '@/redis'

import type { CreateHTTPContextOptions } from '@trpc/server/adapters/standalone'
Expand All @@ -12,6 +13,7 @@ export type CreateContextOptions =

const ajv = await createAjv()
const redis = await createRedisStore()
const mongodb = await createMongoDBStore()

export async function createContext(
_opts: CreateContextOptions,
Expand All @@ -20,5 +22,6 @@ export async function createContext(
ajv,
redis,
bullmq,
mongodb,
}
}
22 changes: 17 additions & 5 deletions src/server/routers/data.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { EventEmitter } from 'node:events'

import { observable } from '@trpc/server/observable'
import consola from 'consola'
import { z } from 'zod'
Expand All @@ -6,8 +8,10 @@ import { publicProcedure, rootRouter } from '../trpc'

import { queueEvents } from '@/bullmq/events'

const logger = consola.withTag('server')
import type { Document, OptionalId } from 'mongodb'

const logger = consola.withTag('server')
const ee = new EventEmitter()
export const dataRouter = rootRouter({
getAll: publicProcedure
.query(async ({
Expand All @@ -16,6 +20,7 @@ export const dataRouter = rootRouter({

getItem: publicProcedure
.input(z.string())
// .use(loggerMiddleware)
.query(async ({
input: id,
ctx: { redis },
Expand All @@ -28,8 +33,8 @@ export const dataRouter = rootRouter({
data: z.unknown(),
}))
.mutation(async ({
input: { id, schemaId, data },
ctx: { redis, ajv, bullmq },
input: { schemaId, data },
ctx: { redis, ajv, bullmq, mongodb },
}) => {
ajv.validateSchema(schemaId, data)
const job = await bullmq.add(
Expand All @@ -40,8 +45,10 @@ export const dataRouter = rootRouter({
logger.info(`Job ${job.id} added:`, JSON.stringify(data, null, 2))
// const returnvalue = await job.waitUntilFinished(queueEvents)
// logger.success(`Job ${job.id} result:`, returnvalue)
const { insertedId } = await mongodb.data.insertOne(data as OptionalId<Document>)
logger.info('insertedId:', insertedId.toJSON())

return redis.data.insertOne(id, data)
return redis.data.insertOne(insertedId.toJSON(), data)
}),

randomNumber: publicProcedure
Expand All @@ -50,8 +57,13 @@ export const dataRouter = rootRouter({
input: n,
ctx: { bullmq },
}) => observable<{ status: number }>((emit) => {
logger.info(`subscription: Running subscription with n = ${n}`)
const onData = (data: any) => {
emit.next(data)
}
ee.on('onData', onData)
queueEvents.on('completed', async ({ jobId }) => {
logger.info(`Job ${jobId} completed`)
logger.info(`subscription: Job ${jobId} completed`)
const job = await bullmq.getJob(jobId)
if (
job
Expand Down
Loading

0 comments on commit 3fe51bc

Please sign in to comment.