Skip to content

Commit

Permalink
Merge pull request #251 from boostcampwm-2024/refactor/socket/unsubsc…
Browse files Browse the repository at this point in the history
…ribe-#250

[BE] 27.10 실시간 체결가 범위 확장 #250
  • Loading branch information
uuuo3o authored Dec 3, 2024
2 parents 1aa4623 + 9c7248a commit 9193a08
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 73 deletions.
4 changes: 2 additions & 2 deletions BE/src/asset/asset.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ export class AssetService {
const userStocks: UserStock[] =
await this.userStockRepository.findAllDistinctCode(userId);

userStocks.map((userStock) =>
this.stockPriceSocketService.unsubscribeByCode(userStock.stock_code),
this.stockPriceSocketService.unsubscribeByCode(
userStocks.map((userStock) => userStock.stock_code),
);
}

Expand Down
8 changes: 3 additions & 5 deletions BE/src/stock/order/stock-order.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export class StockOrderService {
status: StatusType.PENDING,
}))
)
this.stockPriceSocketService.unsubscribeByCode(order.stock_code);
this.stockPriceSocketService.unsubscribeByCode([order.stock_code]);
}

async getPendingListByUserId(userId: number) {
Expand All @@ -135,10 +135,8 @@ export class StockOrderService {
const orders: Order[] =
await this.stockOrderRepository.findAllCodeByStatus();

await Promise.all(
orders.map((order) =>
this.stockPriceSocketService.unsubscribeByCode(order.stock_code),
),
this.stockPriceSocketService.unsubscribeByCode(
orders.map((order) => order.stock_code),
);

await this.stockOrderRepository.delete({ status: StatusType.PENDING });
Expand Down
57 changes: 19 additions & 38 deletions BE/src/stock/trade/history/stock-trade-history.controller.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { Observable } from 'rxjs';
import { Controller, Get, Param, Sse } from '@nestjs/common';
import { ApiOperation, ApiParam, ApiResponse, ApiTags } from '@nestjs/swagger';
import { Controller, Get, Param, Query } from '@nestjs/common';
import {
ApiOperation,
ApiParam,
ApiQuery,
ApiResponse,
ApiTags,
} from '@nestjs/swagger';
import { StockTradeHistoryService } from './stock-trade-history.service';
import { TodayStockTradeHistoryDataDto } from './dto/today-stock-trade-history-data.dto';
import { DailyStockTradeHistoryDataDto } from './dto/daily-stock-trade-history-data.dto';
import { SseEvent } from './interface/sse-event';
import { StockPriceSocketService } from '../../../stockSocket/stock-price-socket.service';

@ApiTags('주식현재가 체결 조회 API')
Expand Down Expand Up @@ -51,50 +55,27 @@ export class StockTradeHistoryController {
return this.stockTradeHistoryService.getDailyStockTradeHistory(stockCode);
}

@Sse(':stockCode/today-sse')
@ApiOperation({ summary: '단일 주식 종목에 대한 실시간 체결 데이터 스트림' })
@ApiParam({
name: 'stockCode',
required: true,
description:
'종목 코드\n\n' +
'(ex) 005930 삼성전자 / 005380 현대차 / 001500 현대차증권',
})
@ApiResponse({
status: 200,
description:
'단일 주식 종목에 대한 주식현재가 체결값 실시간 데이터 조회 성공',
type: TodayStockTradeHistoryDataDto,
})
streamTradeHistory(@Param('stockCode') stockCode: string) {
this.stockPriceSocketService.subscribeByCode(stockCode);

return new Observable<SseEvent>((subscriber) => {
const subscription = this.stockPriceSocketService
.getTradeDataStream(stockCode)
.subscribe(subscriber);

return () => {
this.stockPriceSocketService.unsubscribeByCode(stockCode);
subscription.unsubscribe();
};
});
}

@Get(':stockCode/unsubscribe')
@Get('/unsubscribe')
@ApiOperation({ summary: '페이지를 벗어날 때 구독을 취소하기 위한 API' })
@ApiParam({
@ApiQuery({
name: 'stockCode',
required: true,
description:
'종목 코드\n\n' +
'(ex) 005930 삼성전자 / 005380 현대차 / 001500 현대차증권',
schema: {
type: 'array',
items: {
type: 'string',
},
},
})
@ApiResponse({
status: 200,
description: '구독 취소 성공',
})
unsubscribeCode(@Param('stockCode') stockCode: string) {
this.stockPriceSocketService.unsubscribeByCode(stockCode);
unsubscribeCode(@Query('stockCode') stockCode: string | string[]) {
const stockCodeArray = Array.isArray(stockCode) ? stockCode : [stockCode];
this.stockPriceSocketService.unsubscribeByCode(stockCodeArray);
}
}
39 changes: 11 additions & 28 deletions BE/src/stockSocket/stock-price-socket.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Injectable, InternalServerErrorException } from '@nestjs/common';
import { filter, map, Observable, Subject } from 'rxjs';
import { BaseSocketDomainService } from '../common/websocket/base-socket.domain-service';
import { SocketGateway } from '../common/websocket/socket.gateway';
import { BaseStockSocketDomainService } from './base-stock-socket.domain-service';
Expand All @@ -8,12 +7,10 @@ import { StatusType } from '../stock/order/enum/status-type';
import { TodayStockTradeHistoryDataDto } from '../stock/trade/history/dto/today-stock-trade-history-data.dto';
import { StockDetailSocketDataDto } from '../stock/trade/history/dto/stock-detail-socket-data.dto';
import { StockExecuteOrderRepository } from './stock-execute-order.repository';
import { SseEvent } from '../stock/trade/history/interface/sse-event';

@Injectable()
export class StockPriceSocketService extends BaseStockSocketDomainService {
private connection: { [key: string]: number } = {};
private eventSubject = new Subject<SseEvent>();

constructor(
protected readonly socketGateway: SocketGateway,
Expand Down Expand Up @@ -55,12 +52,6 @@ export class StockPriceSocketService extends BaseStockSocketDomainService {
prdy_ctrt: data[5],
};

this.eventSubject.next({
data: JSON.stringify({
tradeData,
}),
});

this.socketGateway.sendStockIndexValueToClient(
`trade-history/${data[0]}`,
tradeData,
Expand All @@ -72,16 +63,6 @@ export class StockPriceSocketService extends BaseStockSocketDomainService {
);
}

getTradeDataStream(targetStockCode: string): Observable<SseEvent> {
return this.eventSubject.pipe(
filter((event: SseEvent) => {
const parsed = JSON.parse(event.data);
return parsed.tradeData.stck_shrn_iscd === targetStockCode;
}),
map((event: SseEvent) => event),
);
}

subscribeByCode(trKey: string) {
this.baseSocketDomainService.registerCode(this.TR_ID, trKey);

Expand All @@ -92,14 +73,16 @@ export class StockPriceSocketService extends BaseStockSocketDomainService {
this.connection[trKey] = 1;
}

unsubscribeByCode(trKey: string) {
if (!this.connection[trKey]) return;
if (this.connection[trKey] > 1) {
this.connection[trKey] -= 1;
return;
}
delete this.connection[trKey];
this.baseSocketDomainService.unregisterCode(this.TR_ID, trKey);
unsubscribeByCode(trKeys: string[]) {
trKeys.forEach((trKey) => {
if (!this.connection[trKey]) return;
if (this.connection[trKey] > 1) {
this.connection[trKey] -= 1;
return;
}
delete this.connection[trKey];
this.baseSocketDomainService.unregisterCode(this.TR_ID, trKey);
});
}

private async checkExecutableOrder(stockCode: string, value) {
Expand All @@ -122,6 +105,6 @@ export class StockPriceSocketService extends BaseStockSocketDomainService {
status: StatusType.PENDING,
}))
)
this.unsubscribeByCode(stockCode);
this.unsubscribeByCode([stockCode]);
}
}

0 comments on commit 9193a08

Please sign in to comment.