Skip to content

Commit

Permalink
Rework file stream to use file descriptors directly
Browse files Browse the repository at this point in the history
  • Loading branch information
ffried committed May 25, 2021
1 parent a2fdada commit 9af0592
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 122 deletions.
136 changes: 40 additions & 96 deletions Sources/FileStreamer/FileStream.swift
Original file line number Diff line number Diff line change
@@ -1,112 +1,73 @@
import Dispatch
import SystemPackage

public final class FileStream<Value> {
public struct FileStream<Value> {
private typealias FileSource = DispatchSourceRead

public typealias StateCallback<StateValue> = (FileStream, StateValue) throws -> ()
public typealias Callback = (FileStream, Array<Value>) -> ()

private enum State {
case closed
case open(FileDescriptor)
case streaming(FileDescriptor, FileSource)
case idle
case streaming(FileSource)
}

private struct Storage {
struct Callbacks {
var open: Array<StateCallback<FileDescriptor>> = []
var close: Array<StateCallback<FileDescriptor>> = []
var read: Array<Callback> = []
@dynamicMemberLookup
private final class Storage {
struct Values {
var state: State = .idle
var callbacks = Array<Callback>()
}

var state: State = .closed
var callbacks = Callbacks()
}

public let filePath: FilePath
private let lock = DispatchQueue(label: "de.sersoft.filestreamer.filestream.storage.lock")
private var values = Values()

private let storageLock = DispatchQueue(label: "de.sersoft.filestreamer.storage.lock")
private var storage = Storage()
init() {}

public init(filePath: FilePath) {
self.filePath = filePath
}
deinit {
if case .streaming(let source) = values.state {
print("Warning: FileStream storage was deallocated without stopping streaming!")
source.cancel()
}
}

deinit {
do {
try close()
} catch {
print("Trying to close the file descriptor at \(filePath) on streamer deallocation failed: \(error)")
func withValues<T>(do work: (inout Values) throws -> T) rethrows -> T {
dispatchPrecondition(condition: .notOnQueue(lock))
return try lock.sync { try work(&values) }
}
}

private func withStorage<T>(do work: (inout Storage) throws -> T) rethrows -> T {
dispatchPrecondition(condition: .notOnQueue(storageLock))
return try storageLock.sync { try work(&storage) }
}
func with<Value, T>(_ keyPath: WritableKeyPath<Values, Value>, do work: (inout Value) throws -> T) rethrows -> T {
try withValues { try work(&$0[keyPath: keyPath]) }
}

private func withStorageValue<Value, T>(_ keyPath: WritableKeyPath<Storage, Value>, do work: (inout Value) throws -> T) rethrows -> T {
try withStorage { try work(&$0[keyPath: keyPath]) }
subscript<Value>(dynamicMember keyPath: KeyPath<Values, Value>) -> Value {
withValues { $0[keyPath: keyPath] }
}
}

private func getStorageValue<Value>(for keyPath: KeyPath<Storage, Value>) -> Value {
withStorage { $0[keyPath: keyPath] }
}
public let fileDescriptor: FileDescriptor

public func addOpenCallback(_ callback: @escaping StateCallback<FileDescriptor>) {
withStorageValue(\.callbacks.open) { $0.append(callback) }
}
private let storage = Storage()

public func addCloseCallback(_ callback: @escaping StateCallback<FileDescriptor>) {
withStorageValue(\.callbacks.close) { $0.append(callback) }
public init(fileDescriptor: FileDescriptor) {
self.fileDescriptor = fileDescriptor
}

public func addCallback(_ callback: @escaping Callback) {
withStorageValue(\.callbacks.read) { $0.append(callback) }
}

public func open() throws {
try withStorage {
guard case .closed = $0.state else { return }
$0.state = try .open(_open(informing: $0.callbacks.open))
}
}

private func _open(informing callbacks: Array<StateCallback<FileDescriptor>>) throws -> FileDescriptor {
let descriptor = try FileDescriptor.open(filePath, .readOnly)
do {
try callbacks.forEach { try $0(self, descriptor) }
} catch {
do {
try descriptor.close()
} catch {
print("State callback for opening file at \(filePath) threw an error and trying to close the file descriptor failed: \(error)")
}
throw error
}
return descriptor
storage.with(\.callbacks) { $0.append(callback) }
}

public func beginStreaming() throws {
try withStorage {
switch $0.state {
case .closed:
let fileDesc = try _open(informing: $0.callbacks.open)
$0.state = try .streaming(fileDesc, _beginStreaming(from: fileDesc))
case .open(let fileDesc):
$0.state = try .streaming(fileDesc, _beginStreaming(from: fileDesc))
case .streaming(_, _): return
}
try storage.with(\.state) {
guard case .idle = $0 else { return }
$0 = try .streaming(_beginStreaming(from: fileDescriptor))
}
}

private func _beginStreaming(from fileDesc: FileDescriptor) throws -> FileSource {
let workerQueue = DispatchQueue(label: "de.sersoft.filestreamer.worker")
let workerQueue = DispatchQueue(label: "de.sersoft.filestreamer.filestream.worker")
let source = DispatchSource.makeReadSource(fileDescriptor: fileDesc.rawValue, queue: workerQueue)
let rawSize = MemoryLayout<Value>.size
var remainingData = 0
source.setEventHandler { [unowned self] in
source.setEventHandler { [unowned storage] in
do {
remainingData += Int(source.data)
guard case let capacity = remainingData / rawSize, capacity > 0 else { return }
Expand All @@ -115,7 +76,7 @@ public final class FileStream<Value> {
let bytesRead = try fileDesc.read(into: UnsafeMutableRawBufferPointer(buffer))
if case let noOfValues = bytesRead / rawSize, noOfValues > 0 {
let values = Array(buffer.prefix(noOfValues))
self.getStorageValue(for: \.callbacks.read).forEach { $0(self, values) }
storage.callbacks.forEach { $0(self, values) }
}
let leftOverBytes = bytesRead % rawSize
remainingData -= bytesRead - leftOverBytes
Expand All @@ -137,31 +98,14 @@ public final class FileStream<Value> {
}

public func endStreaming() throws {
try withStorageValue(\.state) {
guard case .streaming(let fileDesc, let source) = $0 else { return }
try storage.with(\.state) {
guard case .streaming(let source) = $0 else { return }
try _endStreaming(of: source)
$0 = .open(fileDesc)
$0 = .idle
}
}

private func _endStreaming(of source: FileSource) throws {
source.cancel()
}

public func close() throws {
try withStorage {
switch $0.state {
case .closed: return
case .streaming(let fileDesc, let source):
try _endStreaming(of: source)
fallthrough
case .open(let fileDesc): try _close(fileDesc, informing: $0.callbacks.close)
}
$0.state = .closed
}
}

private func _close(_ fileDesc: FileDescriptor, informing callbacks: Array<StateCallback<FileDescriptor>>) throws {
try fileDesc.closeAfter { try callbacks.forEach { try $0(self, fileDesc) } }
}
}
29 changes: 3 additions & 26 deletions Tests/FileStreamerTests/FileStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ final class FileStreamTests: XCTestCase {
let writingDesc = try FileDescriptor.open(file, .writeOnly,
options: [.create, .truncate],
permissions: [.ownerReadWrite, .groupReadWrite])
let stream = FileStream<TestValue>(filePath: file)
let stream = try FileStream<TestValue>(fileDescriptor: .open(file, .readOnly))
stream.addCallback { stream, values in
callbackCount += 1
collectedEvents.append(contentsOf: values)
Expand All @@ -52,33 +52,10 @@ final class FileStreamTests: XCTestCase {
}
wait(for: [eventExpectation], timeout: 10)
try writingDesc.close()
try stream.close()
try stream.endStreaming()
try stream.fileDescriptor.close()
}
XCTAssertEqual(collectedEvents, expectedEvents)
XCTAssertLessThanOrEqual(callbackCount, expectedEvents.count)
}

func testStateCallbacks() throws {
try withTemporaryDirectory { dir in
let file = FilePath(dir.appendingPathComponent("streaming_file").path)
try FileDescriptor
.open(file, .writeOnly,
options: [.create, .truncate],
permissions: [.ownerReadWrite, .groupReadWrite])
.close()
let stream = FileStream<TestValue>(filePath: file)
let openExpectation = expectation(description: "Waiting for open callback")
stream.addOpenCallback { _, _ in
openExpectation.fulfill()
}
let closeExpectation = expectation(description: "Waiting for close callback")
stream.addCloseCallback { _, _ in
closeExpectation.fulfill()
}
try stream.open()
wait(for: [openExpectation], timeout: 2)
try stream.close()
wait(for: [closeExpectation], timeout: 2)
}
}
}

0 comments on commit 9af0592

Please sign in to comment.