Skip to content

Commit

Permalink
feat: 채팅 서버 구현 완료 v2.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
June1010 committed Dec 13, 2022
1 parent 294e85a commit cbd76f9
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 41 deletions.
1 change: 1 addition & 0 deletions chat-server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SocketModule } from './socket.module';

async function bootstrap() {
const app = await NestFactory.create(SocketModule);
app.enableCors({ origin: '*', credentials: true });
await app.listen(8080);
}
bootstrap();
1 change: 0 additions & 1 deletion chat-server/src/queue-manager/manager.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { CacheModule, Module } from '@nestjs/common';
import * as redisStore from 'cache-manager-ioredis';
// import type { ClientOpts } from 'redis';
import { ManagerService } from './manager.service';
import { BullModule } from '@nestjs/bull';

Expand Down
52 changes: 28 additions & 24 deletions chat-server/src/queue-manager/manager.service.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,29 @@
import { CACHE_MANAGER, Inject, Injectable, Scope } from '@nestjs/common';
import { Cache } from 'cache-manager';
import * as Bull from 'bull';
import { Socket } from 'socket.io';

@Injectable({ scope: Scope.DEFAULT })
export class ManagerService {
constructor(
@Inject(CACHE_MANAGER) private redisCache: Cache,
@Inject(Map) private map: Map<string, Bull.Queue>,
@Inject(Map) private qMap: Map<string, Bull.Queue>,
@Inject(Map) private sMap: Map<string, Socket>,
) {}
async generateQueue(name: string) {
const queue = new Bull(name);
queue.pause();
this.map.set(name, queue); // 서버 메모리에 key: "7:June1010", val: Queue Instance 저장해주기
console.log('generateQueue: ', this.map);
// queue.process(메시지 전송하는 콜백함수) [2] : process 등록
// queue.pause() [1] : 모집신청/참가신청
// queue.resume() [3] : 온라인일경우 process 재개 socket.onconnect()
// socket.ondisconnect() -> pause()
// socket에 합치는게 좋을 듯 -> 석준의 의견
this.qMap.set(name, queue);
return queue;
}

// 특정 큐에 소켓 전송 이벤트 emit/broadcast하는 걸 여기에 등록해주는 메서드.
// registerCallback(name: string, func: Function) {queue.process(func)}

async deleteOneQueue(name: string) {
const deleteWork = [];
const keyArr = await this.redisCache.store.keys(`bull:${name}:*`);
keyArr.map((key: string) => {
deleteWork.push(this.redisCache.del(key));
}); // bull.js Queue 지우는용
this.map.delete(name); // 매핑된 인스턴스 지우는용
this.qMap.delete(name); // 매핑된 인스턴스 지우는용
return Promise.all(deleteWork);
}

Expand All @@ -40,29 +33,40 @@ export class ManagerService {
keyArr.map((key: string) => {
deleteWork.push(this.redisCache.del(key));
}); // bull.js Queue 지우는용
const keys = Array.from(this.map.keys()).filter(
const keys = Array.from(this.qMap.keys()).filter(
(key) => key.split(':')[0] === recruitId,
);
keys.map((key: string) => this.map.delete(key)); // 매핑된 모든 인스턴스 지우는용
keys.map((key: string) => this.qMap.delete(key)); // 매핑된 모든 인스턴스 지우는용
}

// 서버 메모리에서, name(key) 값으로 Queue Instance 가져와서 반환해주기
getQueue(name: string): Bull.Queue {
console.log('getQueue: ', this.map);
return this.map.get(name);
return this.qMap.get(name);
}

getQueueList(recruitId: string) {
console.log('getQueueList: ', this.map);
const keys = Array.from(this.map.keys()).filter(
const keys = Array.from(this.qMap.keys()).filter(
(key) => key.split(':')[0] === recruitId,
);
return keys.map((key) => this.map.get(key));
return keys.map((key) => this.qMap.get(key));
}
}

// 온라인인 유저 상태 관리 -> 그래야 send 이벤트를 발생을 시켜서 값을 가져오니깐
// 온라인이면 -> 바로 전송
// 오프라인이면 -> 큐에 누적
async getQueueSize(name: string) {
const queue = this.qMap.get(name);
if (!queue) return 0;
const { waiting } = await queue.getJobCounts();
return waiting;
}

getSocket(name: string): Socket {
return this.sMap.get(name);
}

// 지금 Cache Module에서 사용하는건, 레디스인데, Cache Module 메모리와 레디스를 함께 쓸 순 없을까?
setSocket(userId: string, socket: Socket): void {
this.sMap.set(userId, socket);
}

deleteSocket(userId: string): void {
this.sMap.delete(userId);
}
}
21 changes: 21 additions & 0 deletions chat-server/src/socket.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Body, Controller, Delete, Get, Post, Query } from '@nestjs/common';
import { throwIfEmpty } from 'rxjs';
import { ManagerService } from './queue-manager/manager.service';
import { SocketService } from './socket.service';

Expand Down Expand Up @@ -44,4 +45,24 @@ export class SocketController {
await this.managerService.deleteManyQueue(recruitId);
return { statusCode: 201 };
}

// GET localhost:8080/chat?userId=pushedrumex&page=2&recruidId=1
@Get('chat')
async getChat(
@Query() query: { userId: string; page: number; recruitId: number },
) {
const { recruitId, userId, page } = query;
const unReadCount = await this.managerService.getQueueSize(
`${recruitId}:${userId}`,
);
const data = await this.socketService.getRecentMessage(
recruitId,
page,
unReadCount,
);
return {
statusCode: 200,
data,
};
}
}
26 changes: 13 additions & 13 deletions chat-server/src/socket.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,19 @@ export class SocketGateway implements OnGatewayDisconnect {
@ConnectedSocket() socket: Socket,
): Promise<void> {
const { recruitId, userId } = data;
console.log(recruitId, userId);
await this.socketService.setCacheData(socket.id, data);
this.queueService.setSocket(userId, socket); // 소켓 인스턴스 저장
const queue = this.queueService.getQueue(`${recruitId}:${userId}`);
if (!queue) return;
try {
await queue.process((job, done) => {
socket.emit('server_sent_unread', job.data);
done();
});
} catch (err) {}
await queue.resume();
if (queue) {
try {
queue.process((job, done) => {
const socketInstance = this.queueService.getSocket(userId);
socketInstance.emit('server_sent_unread', job.data);
done();
});
} catch (err) {}
await queue.resume();
}
// + 이전 메시지를 리버스 인피니트 스크롤로 보내주기
}

Expand All @@ -54,7 +56,6 @@ export class SocketGateway implements OnGatewayDisconnect {
): Promise<void> {
// 해당 모집에 있는 모든 사용자의 큐에 넣어주기
const { content } = data;
console.log('client_sent', data);
const { userId, recruitId } = await this.socketService.getCacheData(
socket.id,
);
Expand All @@ -64,7 +65,6 @@ export class SocketGateway implements OnGatewayDisconnect {
chat.recruitId = recruitId;
chat.content = content;
chat.createdAt = new Date();
console.log('client_sent:queueList', queueList);
const addWork = [];
queueList.map((queue: Bull.Queue) => {
addWork.push(queue.add(chat));
Expand All @@ -74,13 +74,13 @@ export class SocketGateway implements OnGatewayDisconnect {
}

async handleDisconnect(@ConnectedSocket() socket: Socket): Promise<void> {
console.log('끊김');
const { recruitId, userId } = await this.socketService.getCacheData(
socket.id,
);
const queue = this.queueService.getQueue(`${recruitId}:${userId}`);
this.queueService.deleteSocket(userId);
await this.socketService.delCacheData(socket.id);
if (!queue) return;
queue.pause();
await this.socketService.delCacheData(socket.id);
}
}
6 changes: 3 additions & 3 deletions chat-server/src/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ export class SocketService {
return this.cacheManager.del(`id:${socketId}`);
}

async getRecentMessage(recruitId: number) {
async getRecentMessage(recruitId: number, page = 1, unReadCount = 0) {
const response = await this.chatModel
.find({ recruitId })
.sort({ createdAt: -1 })
.limit(10);
.skip((page - 1) * 20 + unReadCount)
.limit(20);
return response.reverse();
}

async saveRecentMessage(chatEntity: Chat) {
console.log('saveRecentMessage', chatEntity);
return this.chatModel.create(chatEntity);
}
}

0 comments on commit cbd76f9

Please sign in to comment.