From 9af0592f03678641f1ad318b774db8cbd48b2559 Mon Sep 17 00:00:00 2001 From: Florian Friedrich Date: Tue, 25 May 2021 20:04:18 +0200 Subject: [PATCH] Rework file stream to use file descriptors directly --- Sources/FileStreamer/FileStream.swift | 136 ++++++------------ Tests/FileStreamerTests/FileStreamTests.swift | 29 +--- 2 files changed, 43 insertions(+), 122 deletions(-) diff --git a/Sources/FileStreamer/FileStream.swift b/Sources/FileStreamer/FileStream.swift index 52019c0..6fc7632 100644 --- a/Sources/FileStreamer/FileStream.swift +++ b/Sources/FileStreamer/FileStream.swift @@ -1,112 +1,73 @@ import Dispatch import SystemPackage -public final class FileStream { +public struct FileStream { private typealias FileSource = DispatchSourceRead - - public typealias StateCallback = (FileStream, StateValue) throws -> () public typealias Callback = (FileStream, Array) -> () private enum State { - case closed - case open(FileDescriptor) - case streaming(FileDescriptor, FileSource) + case idle + case streaming(FileSource) } - private struct Storage { - struct Callbacks { - var open: Array> = [] - var close: Array> = [] - var read: Array = [] + @dynamicMemberLookup + private final class Storage { + struct Values { + var state: State = .idle + var callbacks = Array() } - var state: State = .closed - var callbacks = Callbacks() - } - - public let filePath: FilePath + private let lock = DispatchQueue(label: "de.sersoft.filestreamer.filestream.storage.lock") + private var values = Values() - private let storageLock = DispatchQueue(label: "de.sersoft.filestreamer.storage.lock") - private var storage = Storage() + init() {} - public init(filePath: FilePath) { - self.filePath = filePath - } + deinit { + if case .streaming(let source) = values.state { + print("Warning: FileStream storage was deallocated without stopping streaming!") + source.cancel() + } + } - deinit { - do { - try close() - } catch { - print("Trying to close the file descriptor at \(filePath) on streamer deallocation failed: \(error)") + func withValues(do work: (inout Values) throws -> T) rethrows -> T { + dispatchPrecondition(condition: .notOnQueue(lock)) + return try lock.sync { try work(&values) } } - } - private func withStorage(do work: (inout Storage) throws -> T) rethrows -> T { - dispatchPrecondition(condition: .notOnQueue(storageLock)) - return try storageLock.sync { try work(&storage) } - } + func with(_ keyPath: WritableKeyPath, do work: (inout Value) throws -> T) rethrows -> T { + try withValues { try work(&$0[keyPath: keyPath]) } + } - private func withStorageValue(_ keyPath: WritableKeyPath, do work: (inout Value) throws -> T) rethrows -> T { - try withStorage { try work(&$0[keyPath: keyPath]) } + subscript(dynamicMember keyPath: KeyPath) -> Value { + withValues { $0[keyPath: keyPath] } + } } - private func getStorageValue(for keyPath: KeyPath) -> Value { - withStorage { $0[keyPath: keyPath] } - } + public let fileDescriptor: FileDescriptor - public func addOpenCallback(_ callback: @escaping StateCallback) { - withStorageValue(\.callbacks.open) { $0.append(callback) } - } + private let storage = Storage() - public func addCloseCallback(_ callback: @escaping StateCallback) { - withStorageValue(\.callbacks.close) { $0.append(callback) } + public init(fileDescriptor: FileDescriptor) { + self.fileDescriptor = fileDescriptor } public func addCallback(_ callback: @escaping Callback) { - withStorageValue(\.callbacks.read) { $0.append(callback) } - } - - public func open() throws { - try withStorage { - guard case .closed = $0.state else { return } - $0.state = try .open(_open(informing: $0.callbacks.open)) - } - } - - private func _open(informing callbacks: Array>) throws -> FileDescriptor { - let descriptor = try FileDescriptor.open(filePath, .readOnly) - do { - try callbacks.forEach { try $0(self, descriptor) } - } catch { - do { - try descriptor.close() - } catch { - print("State callback for opening file at \(filePath) threw an error and trying to close the file descriptor failed: \(error)") - } - throw error - } - return descriptor + storage.with(\.callbacks) { $0.append(callback) } } public func beginStreaming() throws { - try withStorage { - switch $0.state { - case .closed: - let fileDesc = try _open(informing: $0.callbacks.open) - $0.state = try .streaming(fileDesc, _beginStreaming(from: fileDesc)) - case .open(let fileDesc): - $0.state = try .streaming(fileDesc, _beginStreaming(from: fileDesc)) - case .streaming(_, _): return - } + try storage.with(\.state) { + guard case .idle = $0 else { return } + $0 = try .streaming(_beginStreaming(from: fileDescriptor)) } } private func _beginStreaming(from fileDesc: FileDescriptor) throws -> FileSource { - let workerQueue = DispatchQueue(label: "de.sersoft.filestreamer.worker") + let workerQueue = DispatchQueue(label: "de.sersoft.filestreamer.filestream.worker") let source = DispatchSource.makeReadSource(fileDescriptor: fileDesc.rawValue, queue: workerQueue) let rawSize = MemoryLayout.size var remainingData = 0 - source.setEventHandler { [unowned self] in + source.setEventHandler { [unowned storage] in do { remainingData += Int(source.data) guard case let capacity = remainingData / rawSize, capacity > 0 else { return } @@ -115,7 +76,7 @@ public final class FileStream { let bytesRead = try fileDesc.read(into: UnsafeMutableRawBufferPointer(buffer)) if case let noOfValues = bytesRead / rawSize, noOfValues > 0 { let values = Array(buffer.prefix(noOfValues)) - self.getStorageValue(for: \.callbacks.read).forEach { $0(self, values) } + storage.callbacks.forEach { $0(self, values) } } let leftOverBytes = bytesRead % rawSize remainingData -= bytesRead - leftOverBytes @@ -137,31 +98,14 @@ public final class FileStream { } public func endStreaming() throws { - try withStorageValue(\.state) { - guard case .streaming(let fileDesc, let source) = $0 else { return } + try storage.with(\.state) { + guard case .streaming(let source) = $0 else { return } try _endStreaming(of: source) - $0 = .open(fileDesc) + $0 = .idle } } private func _endStreaming(of source: FileSource) throws { source.cancel() } - - public func close() throws { - try withStorage { - switch $0.state { - case .closed: return - case .streaming(let fileDesc, let source): - try _endStreaming(of: source) - fallthrough - case .open(let fileDesc): try _close(fileDesc, informing: $0.callbacks.close) - } - $0.state = .closed - } - } - - private func _close(_ fileDesc: FileDescriptor, informing callbacks: Array>) throws { - try fileDesc.closeAfter { try callbacks.forEach { try $0(self, fileDesc) } } - } } diff --git a/Tests/FileStreamerTests/FileStreamTests.swift b/Tests/FileStreamerTests/FileStreamTests.swift index a74ca72..c27c07e 100644 --- a/Tests/FileStreamerTests/FileStreamTests.swift +++ b/Tests/FileStreamerTests/FileStreamTests.swift @@ -30,7 +30,7 @@ final class FileStreamTests: XCTestCase { let writingDesc = try FileDescriptor.open(file, .writeOnly, options: [.create, .truncate], permissions: [.ownerReadWrite, .groupReadWrite]) - let stream = FileStream(filePath: file) + let stream = try FileStream(fileDescriptor: .open(file, .readOnly)) stream.addCallback { stream, values in callbackCount += 1 collectedEvents.append(contentsOf: values) @@ -52,33 +52,10 @@ final class FileStreamTests: XCTestCase { } wait(for: [eventExpectation], timeout: 10) try writingDesc.close() - try stream.close() + try stream.endStreaming() + try stream.fileDescriptor.close() } XCTAssertEqual(collectedEvents, expectedEvents) XCTAssertLessThanOrEqual(callbackCount, expectedEvents.count) } - - func testStateCallbacks() throws { - try withTemporaryDirectory { dir in - let file = FilePath(dir.appendingPathComponent("streaming_file").path) - try FileDescriptor - .open(file, .writeOnly, - options: [.create, .truncate], - permissions: [.ownerReadWrite, .groupReadWrite]) - .close() - let stream = FileStream(filePath: file) - let openExpectation = expectation(description: "Waiting for open callback") - stream.addOpenCallback { _, _ in - openExpectation.fulfill() - } - let closeExpectation = expectation(description: "Waiting for close callback") - stream.addCloseCallback { _, _ in - closeExpectation.fulfill() - } - try stream.open() - wait(for: [openExpectation], timeout: 2) - try stream.close() - wait(for: [closeExpectation], timeout: 2) - } - } }