Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: chat server ver 2.1.0 #241

Merged
merged 9 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,002 changes: 851 additions & 151 deletions chat-server/package-lock.json

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion chat-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"test:e2e": "jest --config ./test/jest-e2e.json"
},
"dependencies": {
"@nestjs/bull": "^0.6.2",
"@nestjs/common": "^9.0.0",
"@nestjs/config": "^2.2.0",
"@nestjs/core": "^9.0.0",
Expand All @@ -29,9 +30,14 @@
"@nestjs/platform-socket.io": "^9.2.1",
"@nestjs/typeorm": "^9.0.1",
"@nestjs/websockets": "^9.2.1",
"cache-manager": "^5.1.4",
"@types/bull": "^4.10.0",
"bull": "^4.10.2",
"cache-manager": "^4.0.0",
"cache-manager-ioredis": "^2.1.0",
"ioredis": "^5.2.4",
"mongoose": "^6.8.0",
"mysql2": "^2.3.3",
"redis": "^3.1.2",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^7.2.0",
Expand All @@ -45,6 +51,7 @@
"@types/express": "^4.17.13",
"@types/jest": "28.1.8",
"@types/node": "^16.0.0",
"@types/redis": "^4.0.11",
"@types/supertest": "^2.0.11",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
Expand Down
27 changes: 0 additions & 27 deletions chat-server/src/common/entities/chat.entity.ts

This file was deleted.

2 changes: 1 addition & 1 deletion chat-server/src/common/schemas/chat.schema.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose';
import { HydratedDocument } from 'mongoose';
import { HydratedDocument, now } from 'mongoose';

export type ChatDocument = HydratedDocument<Chat>;

Expand Down
1 change: 1 addition & 0 deletions chat-server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SocketModule } from './socket.module';

async function bootstrap() {
const app = await NestFactory.create(SocketModule);
app.enableCors({ origin: '*', credentials: true });
await app.listen(8080);
}
bootstrap();
23 changes: 23 additions & 0 deletions chat-server/src/queue-manager/manager.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { CacheModule, Module } from '@nestjs/common';
import * as redisStore from 'cache-manager-ioredis';
import { ManagerService } from './manager.service';
import { BullModule } from '@nestjs/bull';

@Module({
imports: [
CacheModule.registerAsync({
useFactory: () => {
return {
store: redisStore,
host: 'localhost',
port: 6379,
ttl: 0,
};
},
}),
BullModule,
],
providers: [ManagerService, Map],
exports: [ManagerService],
})
export class ManagerModule {}
72 changes: 72 additions & 0 deletions chat-server/src/queue-manager/manager.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { CACHE_MANAGER, Inject, Injectable, Scope } from '@nestjs/common';
import { Cache } from 'cache-manager';
import * as Bull from 'bull';
import { Socket } from 'socket.io';

@Injectable({ scope: Scope.DEFAULT })
export class ManagerService {
constructor(
@Inject(CACHE_MANAGER) private redisCache: Cache,
@Inject(Map) private qMap: Map<string, Bull.Queue>,
@Inject(Map) private sMap: Map<string, Socket>,
) {}
async generateQueue(name: string) {
const queue = new Bull(name);
queue.pause();
this.qMap.set(name, queue);
return queue;
}

async deleteOneQueue(name: string) {
const deleteWork = [];
const keyArr = await this.redisCache.store.keys(`bull:${name}:*`);
keyArr.map((key: string) => {
deleteWork.push(this.redisCache.del(key));
}); // bull.js Queue 지우는용
this.qMap.delete(name); // 매핑된 인스턴스 지우는용
return Promise.all(deleteWork);
}

async deleteManyQueue(recruitId: string) {
const deleteWork = [];
const keyArr = await this.redisCache.store.keys(`bull:${recruitId}:*`);
keyArr.map((key: string) => {
deleteWork.push(this.redisCache.del(key));
}); // bull.js Queue 지우는용
const keys = Array.from(this.qMap.keys()).filter(
(key) => key.split(':')[0] === recruitId,
);
keys.map((key: string) => this.qMap.delete(key)); // 매핑된 모든 인스턴스 지우는용
}

// 서버 메모리에서, name(key) 값으로 Queue Instance 가져와서 반환해주기
getQueue(name: string): Bull.Queue {
return this.qMap.get(name);
}

getQueueList(recruitId: string) {
const keys = Array.from(this.qMap.keys()).filter(
(key) => key.split(':')[0] === recruitId,
);
return keys.map((key) => this.qMap.get(key));
}

async getQueueSize(name: string) {
const queue = this.qMap.get(name);
if (!queue) return 0;
const { waiting } = await queue.getJobCounts();
return waiting;
}

getSocket(name: string): Socket {
return this.sMap.get(name);
}

setSocket(userId: string, socket: Socket): void {
this.sMap.set(userId, socket);
}

deleteSocket(userId: string): void {
this.sMap.delete(userId);
}
}
68 changes: 68 additions & 0 deletions chat-server/src/socket.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { Body, Controller, Delete, Get, Post, Query } from '@nestjs/common';
import { throwIfEmpty } from 'rxjs';
import { ManagerService } from './queue-manager/manager.service';
import { SocketService } from './socket.service';

type BodyDto = {
recruitId: string;
userId?: string;
};
@Controller()
export class SocketController {
constructor(
private managerService: ManagerService,
private socketService: SocketService,
) {}

@Get('unread')
async getUnreadMessage(@Query() bodyDto: BodyDto) {
const { recruitId, userId } = bodyDto;
const queue = this.managerService.getQueue(`${recruitId}:${userId}`);
const { waiting } = await queue.getJobCounts();
return { statusCode: 201, data: { waiting } };
}

// POST localhost:8080/queue {recruitId, userId}
@Post('queue')
async generate(@Body() bodyDto: BodyDto) {
const { recruitId, userId } = bodyDto;
await this.managerService.generateQueue(`${recruitId}:${userId}`);
return { statusCode: 201 };
}

// POST localhost:8080/queue/delete/one {recruitId, userId}
@Post('queue/delete/one')
async deleteOne(@Body() bodyDto: BodyDto) {
const { recruitId, userId } = bodyDto;
await this.managerService.deleteOneQueue(`${recruitId}:${userId}`);
return { statusCode: 201 };
}

// POST localhost:8080/queue/delete/many {recruitId}
@Post('queue/delete/many')
async deleteMany(@Body() bodyDto: BodyDto) {
const { recruitId } = bodyDto;
await this.managerService.deleteManyQueue(recruitId);
return { statusCode: 201 };
}

// GET localhost:8080/chat?userId=pushedrumex&page=2&recruidId=1
@Get('chat')
async getChat(
@Query() query: { userId: string; page: number; recruitId: number },
) {
const { recruitId, userId, page } = query;
const unReadCount = await this.managerService.getQueueSize(
`${recruitId}:${userId}`,
);
const data = await this.socketService.getRecentMessage(
recruitId,
page,
unReadCount,
);
return {
statusCode: 200,
data,
};
}
}
50 changes: 37 additions & 13 deletions chat-server/src/socket.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import {
WebSocketServer,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Chat } from './common/entities/chat.entity';
import { Chat } from './common/schemas/chat.schema';
import { ManagerService } from './queue-manager/manager.service';
import { SocketService } from './socket.service';
import * as Bull from 'bull';

@WebSocketGateway({
namespace: 'chat',
Expand All @@ -18,45 +20,67 @@ import { SocketService } from './socket.service';
},
})
export class SocketGateway implements OnGatewayDisconnect {
constructor(private socketService: SocketService) {}
constructor(
private socketService: SocketService,
private queueService: ManagerService,
) {}

@WebSocketServer() public server: Server;

@SubscribeMessage('join')
async handleLogin(
async handleJoin(
@MessageBody() data: any,
@ConnectedSocket() socket: Socket,
): Promise<void> {
const { recruitId } = data;
socket.join(recruitId.toString()); // room에 입장
const { recruitId, userId } = data;
await this.socketService.setCacheData(socket.id, data);
const recentMsg = await this.socketService.getRecentMessage(recruitId);
socket.emit('server_sent_recent', recentMsg);
this.queueService.setSocket(userId, socket); // 소켓 인스턴스 저장
const queue = this.queueService.getQueue(`${recruitId}:${userId}`);
if (queue) {
try {
queue.process((job, done) => {
const socketInstance = this.queueService.getSocket(userId);
socketInstance.emit('server_sent_unread', job.data);
done();
});
} catch (err) {}
await queue.resume();
}
// + 이전 메시지를 리버스 인피니트 스크롤로 보내주기
}

@SubscribeMessage('client_sent')
async handleEvent(
@MessageBody() data: { content: string },
@ConnectedSocket() socket: Socket,
): Promise<void> {
// 해당 모집에 있는 모든 사용자의 큐에 넣어주기
const { content } = data;
const { userId, recruitId } = await this.socketService.getCacheData(
socket.id,
);
const queueList = this.queueService.getQueueList(recruitId.toString());
const chat = new Chat();
chat.sender = userId;
chat.recruitId = recruitId;
chat.content = content;
chat.createdAt = new Date();
this.server
.in(recruitId.toString())
.emit('server_sent', await this.socketService.saveRecentMessage(chat));
const addWork = [];
queueList.map((queue: Bull.Queue) => {
addWork.push(queue.add(chat));
});
Promise.all(addWork);
await this.socketService.saveRecentMessage(chat);
}

async handleDisconnect(@ConnectedSocket() socket: Socket): Promise<void> {
const { recruitId, userId } = await this.socketService.getCacheData(
socket.id,
);
const queue = this.queueService.getQueue(`${recruitId}:${userId}`);
this.queueService.deleteSocket(userId);
await this.socketService.delCacheData(socket.id);
}
async handleConnection(@ConnectedSocket() socket: Socket): Promise<void> {
await this.socketService.delCacheData(socket.id);
if (!queue) return;
queue.pause();
}
}
12 changes: 12 additions & 0 deletions chat-server/src/socket.module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { CacheModule, Module } from '@nestjs/common';
import { SocketGateway } from './socket.gateway';
import { SocketService } from './socket.service';
import { SocketController } from './socket.controller';
import { ConfigModule } from '@nestjs/config';
import { MongooseModule } from '@nestjs/mongoose';
import { Chat, ChatSchema } from './common/schemas/chat.schema';
import { BullModule } from '@nestjs/bull';
import { ManagerModule } from './queue-manager/manager.module';

@Module({
imports: [
Expand All @@ -15,7 +18,16 @@ import { Chat, ChatSchema } from './common/schemas/chat.schema';
CacheModule.register({
ttl: 0,
}),

ManagerModule,
BullModule.forRoot({
redis: {
host: 'redis-server',
port: 6379,
},
}),
],
controllers: [SocketController],
providers: [SocketGateway, SocketService],
})
export class SocketModule {}
Loading