Skip to content

Commit

Permalink
Merge pull request #142 from boostcampwm-2024/be/fix/websockets
Browse files Browse the repository at this point in the history
[BE/feature] 보유 수량 웹소켓 전송 구현
  • Loading branch information
DongKey777 authored Dec 2, 2024
2 parents 732e598 + 9127130 commit a16bbc0
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 98 deletions.
16 changes: 16 additions & 0 deletions apps/backend/src/database/database.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ export class DatabaseService implements OnModuleInit {
await this.client.end();
}

async listenToChannel(channel: string, callback: (cropNotice: any) => void): Promise<void> {
await this.client.query(`LISTEN ${channel}`);
this.client.on('notification', msg => {
if (msg.channel === channel) {
try {
if (msg.payload) {
const cropNotice = JSON.parse(msg.payload);
callback(cropNotice);
}
} catch (error) {
console.log(error);
}
}
});
}

async runInTransaction(callback: (client: Client) => Promise<void>): Promise<void> {
try {
await this.client.query('BEGIN');
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/src/global/utils/jwtAuthGuard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class JwtAuthGuard {
async canActivate(context: any) {
const isPublic = this.reflector.get<boolean>('isPublic', context.getHandler());
if (isPublic) {
return true; // @Public() 데코레이터가 설정된 경로는 Guard 제외
return true;
}
const request = context.switchToHttp().getRequest();
const token = request.headers.authorization?.split(' ')[1];
Expand Down
157 changes: 61 additions & 96 deletions apps/backend/src/websocket/websocket.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,134 +9,78 @@ import { Server, Socket } from 'socket.io';
import { Inject } from '@nestjs/common';
import { RedisClientType } from 'redis';
import { DatabaseService } from '../database/database.service';
import { JwtService } from '@nestjs/jwt';

@WebSocketGateway({ cors: true })
export class WebsocketGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server: Server;

private clients: Map<string, Socket> = new Map();
private intervals: Map<string, NodeJS.Timeout> = new Map();
private clients: Map<string, string> = new Map();

constructor(
private readonly databaseService: DatabaseService,
private readonly jwtService: JwtService,
@Inject('REDIS_CLIENT') private readonly redisClient: RedisClientType
) {
this.initializePubSub();
) {}

async onModuleInit() {
await this.initializePubSub();
}

afterInit() {
console.log('WebSocket Gateway Initialized');
}

handleConnection(client: Socket) {
async handleConnection(client: Socket) {
const token = client.handshake.auth.authorization?.split(': ')[1];
if (!token) {
client.disconnect(true);
return;
}
try {
const decoded = await this.jwtService.verifyAsync(token);
const { memberId, nickname } = decoded;

client.data.memberId = memberId;
client.data.nickname = nickname;

this.clients.set(memberId, client.id);
await this.sendMemberCropsData(client, memberId);
} catch (error) {
client.disconnect(true);
console.log(error);
}

client.on('join', async data => {
const { cropId } = data;
if (cropId) {
this.clients.set(client.id, client);
client.join(String(cropId));
this.sendCurrentMarketState(client, cropId);

await this.sendInitialData(cropId);
this.startDataPolling(cropId);
}
});
}

handleDisconnect(client: Socket) {
this.clients.delete(client.id);
this.stopDataPolling(client.id);
console.log(`Client disconnected: ${client.id}`);
}

private async sendInitialData(cropId: string) {
const data = await this.databaseService.query(`SELECT * FROM crop_prices WHERE crop_id = $1`, [
cropId
]);
const formattedData = this.groupDataByMinute(data.rows);
const chartData = this.fillMissingData(formattedData);

this.chartDataTransfer(cropId, chartData);
}

private startDataPolling(cropId: string) {
if (!this.intervals.has(cropId)) {
const interval = setInterval(async () => {
const data = await this.databaseService.query(
`SELECT * FROM crop_prices WHERE crop_id = $1`,
[cropId]
);
const formattedData = this.groupDataByMinute(data.rows);
const chartData = this.fillMissingData(formattedData);

this.chartDataTransfer(cropId, chartData);
}, 30000);
this.intervals.set(cropId, interval);
}
}

private stopDataPolling(clientId: string) {
this.clients.forEach((_, cropId) => {
if (!this.clients.has(clientId)) {
const interval = this.intervals.get(cropId);
if (interval) {
clearInterval(interval);
this.intervals.delete(cropId);
}
}
});
}

private groupDataByMinute(rows: any[]) {
const groupedData: { [minute: number]: { x: Date; y: number } } = {};

rows.forEach(row => {
const date = new Date(row.time);
const minute = Math.floor(date.getTime() / 60000) * 60000;
groupedData[minute] = { x: new Date(minute), y: Number(row.price) };
});

return Object.values(groupedData);
}

private fillMissingData(data: { x: Date; y: number }[]): { x: Date; y: number }[] {
const filledData: { x: Date; y: number }[] = [];
let lastValue: { x: Date; y: number } | null = null;

for (let i = 0; i < data.length; i++) {
const current = data[i];

if (!lastValue) {
filledData.push(current);
lastValue = current;
continue;
}

const timeDiff = current.x.getTime() - lastValue.x.getTime();

const oneMinute = 60 * 1000;
if (timeDiff > oneMinute) {
const numMissingPoints = Math.floor(timeDiff / oneMinute);

for (let j = 1; j <= numMissingPoints - 1; j++) {
filledData.push({
x: new Date(lastValue.x.getTime() + j * oneMinute),
y: Number(lastValue.y)
});
}
}

filledData.push(current);
lastValue = current;
}

return filledData;
}

async initializePubSub() {
const subscriber = this.redisClient.duplicate();
try {
await subscriber.connect();

await this.databaseService.listenToChannel('member_crops_update', async payload => {
const memberId = payload[0].member_id;
const query = `
SELECT crop_id, available_quantity, pending_quantity, total_quantity
FROM member_crops
WHERE member_id = $1
`;
const crops = await this.databaseService.query(query, [memberId]);
this.cropDataTransfer(memberId, crops.rows);
});

await subscriber.pSubscribe('__keyspace@0__:orderBook:*', async (_, message) => {
const match = message.match(/orderBook:(\d+):.*/);
if (match) {
Expand All @@ -145,10 +89,21 @@ export class WebsocketGateway implements OnGatewayInit, OnGatewayConnection, OnG
}
});
} catch (error) {
console.error(error);
console.log(error);
}
}

private async sendMemberCropsData(client: Socket, memberId: string) {
const query = `
SELECT crop_id, available_quantity, pending_quantity, total_quantity
FROM member_crops
WHERE member_id = $1
`;

const crops = await this.databaseService.query(query, [memberId]);
client.emit('crops', crops.rows);
}

private async handleRedisUpdate(cropId: string) {
const buyOrders = await this.redisClient.zRange(`orderBook:${cropId}:buy:limit`, 0, -1);
const sellOrders = await this.redisClient.zRange(`orderBook:${cropId}:sell:limit`, 0, -1);
Expand All @@ -170,7 +125,7 @@ export class WebsocketGateway implements OnGatewayInit, OnGatewayConnection, OnG
rawOrders.forEach(orderString => {
const order = JSON.parse(orderString);
const price = order.price;
const quantity = order.quantity;
const quantity = order.unfilledQuantity;

if (orderMap.has(price)) {
orderMap.set(price, orderMap.get(price)! + quantity);
Expand All @@ -193,6 +148,16 @@ export class WebsocketGateway implements OnGatewayInit, OnGatewayConnection, OnG
this.server.to(String(cropId)).emit('chart', data);
}

async cropDataTransfer(memberId: string, data: any) {
const clientId = this.clients.get(memberId);
if (clientId) {
const client = this.server.sockets.sockets.get(clientId);
if (client) {
client.emit('crops', data);
}
}
}

async sendCurrentMarketState(client: Socket, cropId: string) {
const buyOrders = await this.redisClient.zRange(`orderBook:${cropId}:buy:limit`, 0, -1);
const sellOrders = await this.redisClient.zRange(`orderBook:${cropId}:sell:limit`, 0, -1);
Expand Down
14 changes: 13 additions & 1 deletion apps/backend/src/websocket/websocket.module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
import { Module } from '@nestjs/common';
import { DatabaseModule } from 'src/database/database.module';
import { WebsocketGateway } from './websocket.gateway';
import { JwtModule } from '@nestjs/jwt';
import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
imports: [DatabaseModule],
imports: [
DatabaseModule,
JwtModule.registerAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: async (configService: ConfigService) => ({
secret: configService.get<string>('JWT_SECRET'),
signOptions: { expiresIn: '1h' }
})
})
],
providers: [WebsocketGateway]
})
export class WebsocketModule {}

0 comments on commit a16bbc0

Please sign in to comment.