diff --git a/chat-server/src/main.ts b/chat-server/src/main.ts index 085362e..0da22b0 100644 --- a/chat-server/src/main.ts +++ b/chat-server/src/main.ts @@ -3,6 +3,7 @@ import { SocketModule } from './socket.module'; async function bootstrap() { const app = await NestFactory.create(SocketModule); + app.enableCors({ origin: '*', credentials: true }); await app.listen(8080); } bootstrap(); diff --git a/chat-server/src/queue-manager/manager.module.ts b/chat-server/src/queue-manager/manager.module.ts index c8b7fb3..57ac565 100644 --- a/chat-server/src/queue-manager/manager.module.ts +++ b/chat-server/src/queue-manager/manager.module.ts @@ -1,6 +1,5 @@ import { CacheModule, Module } from '@nestjs/common'; import * as redisStore from 'cache-manager-ioredis'; -// import type { ClientOpts } from 'redis'; import { ManagerService } from './manager.service'; import { BullModule } from '@nestjs/bull'; diff --git a/chat-server/src/queue-manager/manager.service.ts b/chat-server/src/queue-manager/manager.service.ts index fb1f3e9..df0caca 100644 --- a/chat-server/src/queue-manager/manager.service.ts +++ b/chat-server/src/queue-manager/manager.service.ts @@ -1,36 +1,29 @@ import { CACHE_MANAGER, Inject, Injectable, Scope } from '@nestjs/common'; import { Cache } from 'cache-manager'; import * as Bull from 'bull'; +import { Socket } from 'socket.io'; @Injectable({ scope: Scope.DEFAULT }) export class ManagerService { constructor( @Inject(CACHE_MANAGER) private redisCache: Cache, - @Inject(Map) private map: Map, + @Inject(Map) private qMap: Map, + @Inject(Map) private sMap: Map, ) {} async generateQueue(name: string) { const queue = new Bull(name); queue.pause(); - this.map.set(name, queue); // 서버 메모리에 key: "7:June1010", val: Queue Instance 저장해주기 - console.log('generateQueue: ', this.map); - // queue.process(메시지 전송하는 콜백함수) [2] : process 등록 - // queue.pause() [1] : 모집신청/참가신청 - // queue.resume() [3] : 온라인일경우 process 재개 socket.onconnect() - // socket.ondisconnect() -> pause() - // socket에 합치는게 좋을 듯 -> 석준의 의견 + this.qMap.set(name, queue); return queue; } - // 특정 큐에 소켓 전송 이벤트 emit/broadcast하는 걸 여기에 등록해주는 메서드. - // registerCallback(name: string, func: Function) {queue.process(func)} - async deleteOneQueue(name: string) { const deleteWork = []; const keyArr = await this.redisCache.store.keys(`bull:${name}:*`); keyArr.map((key: string) => { deleteWork.push(this.redisCache.del(key)); }); // bull.js Queue 지우는용 - this.map.delete(name); // 매핑된 인스턴스 지우는용 + this.qMap.delete(name); // 매핑된 인스턴스 지우는용 return Promise.all(deleteWork); } @@ -40,29 +33,40 @@ export class ManagerService { keyArr.map((key: string) => { deleteWork.push(this.redisCache.del(key)); }); // bull.js Queue 지우는용 - const keys = Array.from(this.map.keys()).filter( + const keys = Array.from(this.qMap.keys()).filter( (key) => key.split(':')[0] === recruitId, ); - keys.map((key: string) => this.map.delete(key)); // 매핑된 모든 인스턴스 지우는용 + keys.map((key: string) => this.qMap.delete(key)); // 매핑된 모든 인스턴스 지우는용 } // 서버 메모리에서, name(key) 값으로 Queue Instance 가져와서 반환해주기 getQueue(name: string): Bull.Queue { - console.log('getQueue: ', this.map); - return this.map.get(name); + return this.qMap.get(name); } getQueueList(recruitId: string) { - console.log('getQueueList: ', this.map); - const keys = Array.from(this.map.keys()).filter( + const keys = Array.from(this.qMap.keys()).filter( (key) => key.split(':')[0] === recruitId, ); - return keys.map((key) => this.map.get(key)); + return keys.map((key) => this.qMap.get(key)); } -} -// 온라인인 유저 상태 관리 -> 그래야 send 이벤트를 발생을 시켜서 값을 가져오니깐 -// 온라인이면 -> 바로 전송 -// 오프라인이면 -> 큐에 누적 + async getQueueSize(name: string) { + const queue = this.qMap.get(name); + if (!queue) return 0; + const { waiting } = await queue.getJobCounts(); + return waiting; + } + + getSocket(name: string): Socket { + return this.sMap.get(name); + } -// 지금 Cache Module에서 사용하는건, 레디스인데, Cache Module 메모리와 레디스를 함께 쓸 순 없을까? + setSocket(userId: string, socket: Socket): void { + this.sMap.set(userId, socket); + } + + deleteSocket(userId: string): void { + this.sMap.delete(userId); + } +} diff --git a/chat-server/src/socket.controller.ts b/chat-server/src/socket.controller.ts index a55b673..485d8d3 100644 --- a/chat-server/src/socket.controller.ts +++ b/chat-server/src/socket.controller.ts @@ -1,4 +1,5 @@ import { Body, Controller, Delete, Get, Post, Query } from '@nestjs/common'; +import { throwIfEmpty } from 'rxjs'; import { ManagerService } from './queue-manager/manager.service'; import { SocketService } from './socket.service'; @@ -44,4 +45,24 @@ export class SocketController { await this.managerService.deleteManyQueue(recruitId); return { statusCode: 201 }; } + + // GET localhost:8080/chat?userId=pushedrumex&page=2&recruidId=1 + @Get('chat') + async getChat( + @Query() query: { userId: string; page: number; recruitId: number }, + ) { + const { recruitId, userId, page } = query; + const unReadCount = await this.managerService.getQueueSize( + `${recruitId}:${userId}`, + ); + const data = await this.socketService.getRecentMessage( + recruitId, + page, + unReadCount, + ); + return { + statusCode: 200, + data, + }; + } } diff --git a/chat-server/src/socket.gateway.ts b/chat-server/src/socket.gateway.ts index 2384903..c736d79 100644 --- a/chat-server/src/socket.gateway.ts +++ b/chat-server/src/socket.gateway.ts @@ -33,17 +33,19 @@ export class SocketGateway implements OnGatewayDisconnect { @ConnectedSocket() socket: Socket, ): Promise { const { recruitId, userId } = data; - console.log(recruitId, userId); await this.socketService.setCacheData(socket.id, data); + this.queueService.setSocket(userId, socket); // 소켓 인스턴스 저장 const queue = this.queueService.getQueue(`${recruitId}:${userId}`); - if (!queue) return; - try { - await queue.process((job, done) => { - socket.emit('server_sent_unread', job.data); - done(); - }); - } catch (err) {} - await queue.resume(); + if (queue) { + try { + queue.process((job, done) => { + const socketInstance = this.queueService.getSocket(userId); + socketInstance.emit('server_sent_unread', job.data); + done(); + }); + } catch (err) {} + await queue.resume(); + } // + 이전 메시지를 리버스 인피니트 스크롤로 보내주기 } @@ -54,7 +56,6 @@ export class SocketGateway implements OnGatewayDisconnect { ): Promise { // 해당 모집에 있는 모든 사용자의 큐에 넣어주기 const { content } = data; - console.log('client_sent', data); const { userId, recruitId } = await this.socketService.getCacheData( socket.id, ); @@ -64,7 +65,6 @@ export class SocketGateway implements OnGatewayDisconnect { chat.recruitId = recruitId; chat.content = content; chat.createdAt = new Date(); - console.log('client_sent:queueList', queueList); const addWork = []; queueList.map((queue: Bull.Queue) => { addWork.push(queue.add(chat)); @@ -74,13 +74,13 @@ export class SocketGateway implements OnGatewayDisconnect { } async handleDisconnect(@ConnectedSocket() socket: Socket): Promise { - console.log('끊김'); const { recruitId, userId } = await this.socketService.getCacheData( socket.id, ); const queue = this.queueService.getQueue(`${recruitId}:${userId}`); + this.queueService.deleteSocket(userId); + await this.socketService.delCacheData(socket.id); if (!queue) return; queue.pause(); - await this.socketService.delCacheData(socket.id); } } diff --git a/chat-server/src/socket.service.ts b/chat-server/src/socket.service.ts index 07a7db1..c11fb60 100644 --- a/chat-server/src/socket.service.ts +++ b/chat-server/src/socket.service.ts @@ -29,16 +29,16 @@ export class SocketService { return this.cacheManager.del(`id:${socketId}`); } - async getRecentMessage(recruitId: number) { + async getRecentMessage(recruitId: number, page = 1, unReadCount = 0) { const response = await this.chatModel .find({ recruitId }) .sort({ createdAt: -1 }) - .limit(10); + .skip((page - 1) * 20 + unReadCount) + .limit(20); return response.reverse(); } async saveRecentMessage(chatEntity: Chat) { - console.log('saveRecentMessage', chatEntity); return this.chatModel.create(chatEntity); } }