Skip to content

Commit

Permalink
feat: Improve GameGestureStabilizationTransformer (#33)
Browse files Browse the repository at this point in the history
Co-authored-by: nicolantean <[email protected]>
  • Loading branch information
mirland and nicolantean authored Sep 26, 2024
1 parent 8cd52f3 commit 39a1ab9
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,34 +1,138 @@
import 'dart:async';
import 'dart:io';

import 'package:rxdart/rxdart.dart';
import 'package:simon_ai/core/common/logger.dart';
import 'package:simon_ai/core/model/hand_gesture_with_position.dart';
import 'package:simon_ai/core/model/hand_gestures.dart';

const _logEnabled = true;
const _logVerbose = false;

/// Transforms a stream of [HandGestureWithPosition] into a stream of game
/// [HandGestureWithPosition].
/// If the stream of gestures is consistent for a certain amount of time,
/// the transformer will emit the gesture.
class GameGestureStabilizationTransformer extends StreamTransformerBase<
HandGestureWithPosition, HandGestureWithPosition> {
final _gestureDetectionTime = const Duration(milliseconds: 400);
static final _defaultTimeSpan = Platform.isAndroid
? const Duration(milliseconds: 500)
: const Duration(seconds: 1);
static const _defaultWindowSize = 5;
static final _defaultMinWindowSize = Platform.isAndroid ? 3 : 5;
static final _defaultMaxUnrecognizedGesturesInWindow =
Platform.isAndroid ? 5 : 3;

final int _windowSize;
final int _minWindowSize;
final int _maxUnrecognizedGesturesInWindow;
final Duration _timeSpan;

late StreamSubscription<HandGestureWithPosition> _subscription;
late StreamController<List<HandGestureWithPosition>> _controller;

final List<HandGestureWithPosition> _buffer = [];
Timer? _timer;
final _windowTime = Stopwatch();
var _requireEmmit = false;
var _currentUnrecognizedGestures = 0;

GameGestureStabilizationTransformer({
int? maxUnrecognizedGesturesInWindow,
int? minWindowSize,
Duration? timeSpan,
int? windowSize,
}) : _timeSpan = timeSpan ?? _defaultTimeSpan,
_windowSize = windowSize = _defaultWindowSize,
_minWindowSize = minWindowSize ?? _defaultMinWindowSize,
_maxUnrecognizedGesturesInWindow = maxUnrecognizedGesturesInWindow ??
_defaultMaxUnrecognizedGesturesInWindow;

void _resetBuffer() {
_buffer.clear();
_timer?.cancel();
_timer = null;
_windowTime.reset();
_currentUnrecognizedGestures = 0;
}

void _emitBuffer() {
if (_buffer.isNotEmpty) {
if (_buffer.length >= _minWindowSize) {
_requireEmmit = false;
_controller.add(List.unmodifiable(_buffer));
if (_logEnabled) {
Logger.i(
"Emit gesture ${_buffer.first.gesture}, "
"bufferSize: ${_buffer.length}, "
"time: ${_windowTime.elapsedMilliseconds} millis",
);
}
} else {
_requireEmmit = true;
}
}
if (!_requireEmmit) {
_resetBuffer();
}
}

void _handleNewGesture(HandGestureWithPosition gestureWithPosition) {
if (gestureWithPosition.gesture == HandGesture.unrecognized) {
_currentUnrecognizedGestures++;
if (_currentUnrecognizedGestures >= _maxUnrecognizedGesturesInWindow) {
_resetBuffer();
if (_logVerbose) {
Logger.i("Max unrecognized gestures reached, reset window");
}
} else if (_logVerbose) {
Logger.i("Discard unrecognized gesture");
}
return;
}
if (_buffer.isEmpty) {
_windowTime.reset();
}

_buffer.add(gestureWithPosition);
_timer ??= Timer(_timeSpan, _emitBuffer);

final firstGesture = _buffer.first;
final isConsistent =
_buffer.every((gesture) => gesture.gesture == firstGesture.gesture);
if (!isConsistent) {
if (_logVerbose) {
Logger.i("Discard gesture ${firstGesture.gesture}");
}
_resetBuffer();
}

if (_buffer.length >= _windowSize || _requireEmmit) {
_emitBuffer();
}
}

@override
Stream<HandGestureWithPosition> bind(
Stream<HandGestureWithPosition> stream,
) =>
stream
.buffer(Stream.periodic(_gestureDetectionTime))
.asyncMap((bufferedGestures) {
if (bufferedGestures.isEmpty) return null;
final HandGestureWithPosition firstGesture = bufferedGestures.first;
final bool isConsistent = bufferedGestures
.every((gesture) => gesture.gesture == firstGesture.gesture);
return (isConsistent &&
firstGesture.gesture != HandGesture.unrecognized)
? firstGesture
: null;
})
.whereNotNull()
.distinct((previous, next) => previous.gesture == next.gesture)
.asBroadcastStream();
Stream<HandGestureWithPosition> bind(Stream<HandGestureWithPosition> stream) {
_controller = StreamController<List<HandGestureWithPosition>>(
onCancel: () => _subscription.cancel(),
onResume: () => _subscription.resume(),
onPause: () => _subscription.pause(),
);
_windowTime.start();

_subscription = stream.listen(
_handleNewGesture,
onError: _controller.addError,
onDone: () {
_emitBuffer();
_controller.close();
},
cancelOnError: false,
);

return _controller.stream
.map((bufferedGestures) => bufferedGestures.lastOrNull)
.whereNotNull();
}
}
4 changes: 3 additions & 1 deletion client/lib/core/repository/game_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ class GameManager {

Stream<HandGestureWithPosition> get gameSequenceStream =>
_gameSequenceController.stream
.transform(GameGestureStabilizationTransformer());
.transform(GameGestureStabilizationTransformer())
.distinct((previous, next) => previous.gesture == next.gesture)
.asBroadcastStream();

Stream<dynamic> get gestureStream => _gestureStreamController.stream;
late Stream<dynamic> _newFrameStream;
Expand Down
16 changes: 12 additions & 4 deletions client/lib/ui/extensions/stream_extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@ import 'dart:async';

extension StreamExtensions<T> on Stream<T> {
Stream<R> processWhileAvailable<R>(FutureOr<R> Function(T) asyncMapper) {
var isClosed = false;
T? lastUnprocessedValue;
var isProcessing = false;
// Define a separate async function to process the value.
Future<void> processValue(T value, EventSink<R> sink) async {
try {
sink.add(await asyncMapper(value));
T? lastValue;
while (lastUnprocessedValue != null) {
while (lastUnprocessedValue != null && !isClosed) {
lastValue = lastUnprocessedValue;
lastUnprocessedValue = null;
sink.add(await asyncMapper(lastValue as T));
if (isClosed) {
sink.add(await asyncMapper(lastValue as T));
}
}
} catch (e) {
sink.addError(e);
if (!isClosed) {
sink.addError(e);
}
isProcessing = false;
}
}
Expand All @@ -35,7 +40,10 @@ extension StreamExtensions<T> on Stream<T> {
isProcessing = false;
sink.addError(error, stackTrace);
},
handleDone: (EventSink<R> sink) => sink.close(),
handleDone: (EventSink<R> sink) {
sink.close();
isClosed = true;
},
),
);
}
Expand Down

0 comments on commit 39a1ab9

Please sign in to comment.