Skip to content

Commit

Permalink
Only allow one callback
Browse files Browse the repository at this point in the history
  • Loading branch information
ffried committed May 26, 2021
1 parent c480b40 commit ee445e2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 22 deletions.
28 changes: 9 additions & 19 deletions Sources/FileStreamer/FileStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ import SystemPackage
public struct FileStream<Value> {
private typealias FileSource = DispatchSourceRead
/// The callback that is called whenever values are read from the file.
public typealias Callback = (FileStream, Array<Value>) -> ()
public typealias Callback = (Array<Value>) -> ()

private enum State {
case idle
case streaming(FileSource)
}

@dynamicMemberLookup
private final class Storage {
struct Values {
var state: State = .idle
var callbacks = Array<Callback>()
}

private let lock = DispatchQueue(label: "de.sersoft.filestreamer.filestream.storage.lock")
Expand All @@ -39,38 +37,31 @@ public struct FileStream<Value> {
func with<Value, T>(_ keyPath: WritableKeyPath<Values, Value>, do work: (inout Value) throws -> T) rethrows -> T {
try withValues { try work(&$0[keyPath: keyPath]) }
}

subscript<Value>(dynamicMember keyPath: KeyPath<Values, Value>) -> Value {
withValues { $0[keyPath: keyPath] }
}
}

/// The file descriptor of this stream.
/// - Note: The file descriptor is not managed by this type. Opening and closing the file descriptor is the responsibility of the caller.
public let fileDescriptor: FileDescriptor

private let callback: Callback

private let storage = Storage()

/// Creates a new stream using the given file descriptor.
/// The descriptor should be open for reading!
/// - Parameter fileDescriptor: The file descriptor to use.
/// - Parameter callback: The callback to call when values are read.
/// - Note: The file descriptor is not managed by this type. Opening and closing the file descriptor is the responsibility of the caller.
public init(fileDescriptor: FileDescriptor) {
public init(fileDescriptor: FileDescriptor, callback: @escaping Callback) {
self.fileDescriptor = fileDescriptor
}

/// Adds a callback to be called for read events.
/// - Parameter callback: The callback to call for newly read events.
/// - Note: Callbacks are executed in the order they are added.
public func addCallback(_ callback: @escaping Callback) {
storage.with(\.callbacks) { $0.append(callback) }
self.callback = callback
}

/// Starts streaming values from the given file descriptor.
public func beginStreaming() {
storage.with(\.state) {
guard case .idle = $0 else { return }
$0 = .streaming(_beginStreaming(from: fileDescriptor))
$0 = .streaming(_beginStreaming(from: fileDescriptor))
}
}

Expand All @@ -79,16 +70,15 @@ public struct FileStream<Value> {
let source = DispatchSource.makeReadSource(fileDescriptor: fileDesc.rawValue, queue: workerQueue)
let rawSize = MemoryLayout<Value>.size
var remainingData = 0
source.setEventHandler { [unowned storage] in
source.setEventHandler { [callback] in
do {
remainingData += Int(source.data)
guard case let capacity = remainingData / rawSize, capacity > 0 else { return }
let buffer = UnsafeMutableBufferPointer<Value>.allocate(capacity: capacity)
defer { buffer.deallocate() }
let bytesRead = try fileDesc.read(into: UnsafeMutableRawBufferPointer(buffer))
if case let noOfValues = bytesRead / rawSize, noOfValues > 0 {
let values = Array(buffer.prefix(noOfValues))
storage.callbacks.forEach { $0(self, values) }
callback(Array(buffer.prefix(noOfValues)))
}
let leftOverBytes = bytesRead % rawSize
remainingData -= bytesRead - leftOverBytes
Expand Down
5 changes: 2 additions & 3 deletions Tests/FileStreamerTests/FileStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ final class FileStreamTests: XCTestCase {
let writingDesc = try FileDescriptor.open(file, .writeOnly,
options: [.create, .truncate],
permissions: [.ownerReadWrite, .groupReadWrite])
let stream = try FileStream<TestValue>(fileDescriptor: .open(file, .readOnly))
stream.addCallback { stream, values in
let stream = try FileStream<TestValue>(fileDescriptor: .open(file, .readOnly)) {
callbackCount += 1
collectedEvents.append(contentsOf: values)
collectedEvents.append(contentsOf: $0)
if collectedEvents.count >= expectedEvents.count {
eventExpectation.fulfill()
}
Expand Down

0 comments on commit ee445e2

Please sign in to comment.