Skip to content

Commit

Permalink
[SCRUM-58] Stream Listener (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jihyun247 authored Dec 18, 2024
1 parent e1446ba commit 468d3be
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 48 deletions.
16 changes: 16 additions & 0 deletions Projects/Core/StreamListener/Interface/AnyStreamContinuation.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import Foundation

public class AnyStreamContinuation {
private let _yield: (Any) -> Void

public init<T>(_ continuation: AsyncStream<T>.Continuation) {
self._yield = { value in
guard let value = value as? T else { return }
continuation.yield(value)
}
}

public func yield(_ value: Any) {
_yield(value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,49 @@ import Foundation
import Dependencies
import DependenciesMacros

/*
TODO:
StreamListener를 토스트나 다이얼로그, 에러뷰 등 다양한 상황의 스트림을 만들 수 있도록 둘 것인지, serverState 추적만을 하도록 둘 것인지 정해야함 (네이밍 다시 해야함)
++ NetworkTracking도 그 목적이 StreamListener와 유사함
*/
public protocol StreamListenerProtocol {
func send<T: StreamTypeProtocol>(_ state: T) async
func receive<T: StreamTypeProtocol>(_ type: T.Type) -> AsyncStream<T>
}

@DependencyClient
public struct StreamListener {
public var sendServerState: @Sendable (_ state: ServerState) async -> Void
public var updateServerState: @Sendable () -> AsyncStream<ServerState> = { .never }
public var protocolAdapter: StreamListenerProtocol

public init(protocolAdapter: StreamListenerProtocol) {
self.protocolAdapter = protocolAdapter
}
}

extension StreamListener: TestDependencyKey {
public static let previewValue = Self()
public static let testValue = Self()
public static let previewValue = Self(protocolAdapter: StreamListenerTestImpl())
public static let testValue = Self(protocolAdapter: StreamListenerTestImpl())
}

public enum ServerState {
case requestStarted
case requestCompleted
case errorOccured
case networkDisabled
private struct StreamListenerTestImpl: StreamListenerProtocol {
func send<T: StreamTypeProtocol>(_ state: T) async {}
func receive<T: StreamTypeProtocol>(_ type: T.Type) -> AsyncStream<T> { .never }
}

/*
// 서버 상태 변경 시
await send(ServerState.requestStarted)
어떤 stream에 어떤 state를 보낼 것인지

// 스트림 구독
let serverStateStream: AsyncStream<ServerState> = receive(for: ServerState.self)
어떤 stream을 구독할 것인지 , , , , . . ..
*/

/*
catCore - serverstate 네트워크 에러 send
catcore - send와 동시에 core.action에 networkerror 같은 action만들고 어떤 api인지 같이 보냄
catcore - core.action.networkerror([api종류]) 에선, retry 구독, 스트림 받으면 어떤 api인지 알고잇으니 재요청 하면됨 (재요청하고나선 구독한 스트림 꼭 종료)
appCore - serverstate 네트워크 에러 receive
appCore - 네트워크에러 receive 네트워크에러뷰 띄움
retryCore - 재시도 뷰에서 재시도버튼 클릭 시 재시도 send
*/

/*
해야할 것: receive, send 조금 더 수정하기 . . . ..
*/
21 changes: 21 additions & 0 deletions Projects/Core/StreamListener/Interface/Streams.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import Foundation

// MARK: StreamType

public protocol StreamTypeProtocol: Hashable {
static var key: StreamType { get }
}

public enum StreamType: Hashable {
case serverState
case retry
case toast
}

public enum ServerState: StreamTypeProtocol {
public static var key: StreamType { .serverState }
case requestStarted
case requestCompleted
case errorOccured
case networkDisabled
}
49 changes: 28 additions & 21 deletions Projects/Core/StreamListener/Sources/StreamListener.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,39 @@ extension StreamListener: DependencyKey {
public static let liveValue: StreamListener = .live()

public static func live() -> StreamListener {
return .init(protocolAdapter: StreamListenerImpl())
}
}

// 네이밍 추천 plz .,
actor ContinuationActor {
var continuation: AsyncStream<ServerState>.Continuation?
final class StreamListenerImpl: StreamListenerProtocol {
private let actor = StreamActor()

func set(_ newContinuation: AsyncStream<ServerState>.Continuation) {
continuation = newContinuation
}
func send<T: StreamTypeProtocol>(_ state: T) async {
await actor.yield(type: T.key, value: T.self)
}

func yield(_ state: ServerState) {
continuation?.yield(state)
}
func receive<T: StreamTypeProtocol>(_ type: T.Type) -> AsyncStream<T> {
let (stream, continuation) = AsyncStream<T>.makeStream()
Task {
await actor.register(key: T.key, continuation: continuation)
}
return stream
}
}

let continuationActor = ContinuationActor()
let asyncStream = AsyncStream<ServerState> { continuation in
Task { await continuationActor.set(continuation) }
}
private actor StreamActor {
private var streams: [StreamType: AnyStreamContinuation] = [:]

func register<T: StreamTypeProtocol>(key: StreamType, continuation: AsyncStream<T>.Continuation) {
streams[key] = AnyStreamContinuation(continuation)
}

func yield<T: StreamTypeProtocol>(type: StreamType, value: T.Type) {
guard let continuation = streams[type] else { return }
continuation.yield(value)
}

return StreamListener(
sendServerState: { state in
await continuationActor.yield(state)
},
updateServerState: {
return asyncStream
}
)
func remove(type: StreamType) {
streams[type] = nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public struct NamingCatCore {

case let ._postNamedCatRequest(request):
return .run { send in
await self.streamListener.sendServerState(state: .requestStarted)
await self.streamListener.protocolAdapter.send(ServerState.requestStarted)
await send(._postNamedCatResponse(Result {
try await self.catService.changeCatName(apiClient: apiClient, request: request)
}))
Expand All @@ -128,7 +128,7 @@ public struct NamingCatCore {
case ._postNamedCatResponse(.success(_)):
return .run { send in
try await self.userService.syncUserInfo(apiClient: self.apiClient, databaseClient: self.databaseClient)
await self.streamListener.sendServerState(state: .requestCompleted)
await self.streamListener.protocolAdapter.send(ServerState.requestCompleted)
await send(._setNextAction)
}

Expand Down Expand Up @@ -174,14 +174,14 @@ extension NamingCatCore {
networkError.code == .networkConnectionLost ||
networkError.code == .notConnectedToInternet {
return .run { send in
await streamListener.sendServerState(state: .networkDisabled)
await self.streamListener.protocolAdapter.send(ServerState.networkDisabled)
}
}
guard let error = error as? NetworkError else { return .none }
switch error {
case .apiError(_):
return .run { send in
await streamListener.sendServerState(state: .errorOccured)
await self.streamListener.protocolAdapter.send(ServerState.errorOccured)
}
default:
return .none
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public struct SelectCatCore {

case ._fetchCatListRequest:
return .run { send in
await streamListener.sendServerState(state: .requestStarted)
await self.streamListener.protocolAdapter.send(ServerState.requestStarted)
await send(._fetchCatListResponse(Result {
try await catService.getCatList(apiClient)
}))
Expand All @@ -134,15 +134,15 @@ public struct SelectCatCore {
case let ._fetchCatListResponse(.success(response)):
state.catList = response.map { SomeCat(baseInfo: $0) }
return .run { send in
await streamListener.sendServerState(state: .requestCompleted)
await self.streamListener.protocolAdapter.send(ServerState.requestCompleted)
}

case let ._fetchCatListResponse(.failure(error)):
return handleError(error: error)

case let ._postSelectedCatRequest(request):
return .run { send in
await streamListener.sendServerState(state: .requestStarted)
await self.streamListener.protocolAdapter.send(ServerState.requestStarted)
await send(._postSelectedCatResponse(Result {
try await userService.selectCat(apiClient: self.apiClient, request: request)
}))
Expand All @@ -151,7 +151,7 @@ public struct SelectCatCore {
case ._postSelectedCatResponse(.success(_)):
return .run { send in
try await userService.syncUserInfo(apiClient: self.apiClient, databaseClient: self.databaseClient)
await streamListener.sendServerState(state: .requestCompleted)
await self.streamListener.protocolAdapter.send(ServerState.requestCompleted)
await send(._setNextAction)
}

Expand All @@ -174,14 +174,14 @@ extension SelectCatCore {
networkError.code == .networkConnectionLost ||
networkError.code == .notConnectedToInternet {
return .run { send in
await streamListener.sendServerState(state: .networkDisabled)
await self.streamListener.protocolAdapter.send(ServerState.networkDisabled)
}
}
guard let error = error as? NetworkError else { return .none }
switch error {
case .apiError(_):
return .run { send in
await streamListener.sendServerState(state: .errorOccured)
await self.streamListener.protocolAdapter.send(ServerState.errorOccured)
}
default:
return .none
Expand Down
5 changes: 3 additions & 2 deletions Projects/Feature/Feature/Sources/AppCore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Feature
//
// Created by devMinseok on 7/22/24.
// Copyright © 2024 PomoNyang. All rights reserved.
// Copyright 2024 PomoNyang. All rights reserved.
//

import SwiftUI
Expand Down Expand Up @@ -92,7 +92,8 @@ public struct AppCore {
case .onLoad:
state.splash = SplashCore.State()
return .run { send in
for await serverState in streamListener.updateServerState() {
let serverStateStream: AsyncStream<ServerState> = streamListener.protocolAdapter.receive(ServerState.self)
for await serverState in serverStateStream {
await send(.serverState(serverState))
}
}
Expand Down
2 changes: 1 addition & 1 deletion XCFramework/Binary/RealmSwift.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"10.52.3": "https://github.com/realm/realm-swift/releases/download/v10.52.3/Carthage.xcframework.zip"
"10.54.1": "https://github.com/realm/realm-swift/releases/download/v10.54.1/Carthage.xcframework.zip"
}

0 comments on commit 468d3be

Please sign in to comment.