Skip to content

Commit

Permalink
Ball Box Cleaning Complete
Browse files Browse the repository at this point in the history
- Removed acceleration strategy, did not work well
- Modified spacial / teleport detection to look at range of frames instead of adjacent frames
  • Loading branch information
dweizzz committed Dec 3, 2023
1 parent cc17837 commit f04f39b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 65 deletions.
Binary file added data/trimmed.mov
Binary file not shown.
119 changes: 54 additions & 65 deletions src/processing/trendline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from state import GameState, BallFrame, Box
import cv2
import numpy as np

class LinearTrendline:
def __init__(self, state: GameState, args):
Expand All @@ -21,6 +19,7 @@ def calculate_velocity(self):
x_center2, y_center2 = frame2.ball.box.center()

time_diff = 1 / self.fps
time_diff = 1
vx = (x_center2 - x_center1) / time_diff
vy = (y_center2 - y_center1) / time_diff

Expand All @@ -31,6 +30,7 @@ def calculate_velocity(self):

avg_vx = sum(v[0] for v in velocities) / len(velocities)
avg_vy = sum(v[1] for v in velocities) / len(velocities)
# print(f"Frame {i}: Velocity - vx: {avg_vx:.2f}, vy: {avg_vy:.2f}")

frame2.ball.vx = avg_vx
frame2.ball.vy = avg_vy
Expand Down Expand Up @@ -81,72 +81,61 @@ def create_predicted_box(self, x_center, y_center, ball_size=20):

return Box(xmin_pred, ymin_pred, xmax_pred, ymax_pred, predicted=True)

def calculate_acceleration(self):
for i in range(2, len(self.state.frames)):
frame0 = self.state.frames[i - 2]
frame1 = self.state.frames[i - 1]
frame2 = self.state.frames[i]

if frame0.ball and frame1.ball and frame2.ball:
vx1 = frame1.ball.vx
vy1 = frame1.ball.vy

vx2 = frame2.ball.vx
vy2 = frame2.ball.vy

time_diff = 1 / self.fps

ax = (vx2 - vx1) / time_diff
ay = (vy2 - vy1) / time_diff

frame2.ball.ax = ax
frame2.ball.ay = ay

def detect_abrupt_changes(self, acceleration_threshold=500):
for i in range(len(self.state.frames)):
frame = self.state.frames[i]
if frame.ball and hasattr(frame.ball, 'ax') and hasattr(frame.ball, 'ay'):
# Dynamic threshold
dynamic_threshold = acceleration_threshold * (1 + (abs(frame.ball.vx) + abs(frame.ball.vy)) / 100)
if abs(frame.ball.ax) > dynamic_threshold or abs(frame.ball.ay) > dynamic_threshold:
frame.ball = None

def is_spatial_change_abrupt(self, frame1, frame2, spatial_threshold=30):
if frame1.ball and frame2.ball:
x1, y1 = frame1.ball.box.center()
x2, y2 = frame2.ball.box.center()

distance = ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
return distance > spatial_threshold

# def track_optical_flow(self):
# # Initialize variables for optical flow
# prev_frame = None
# prev_ball_position = None

# # Process each frame
# for i, current_frame in enumerate(self.state.frames):
# if current_frame.ball:
# current_ball_position = np.array([[current_frame.ball.box.center()]], dtype=np.float32)

# if prev_frame is not None and prev_ball_position is not None:
# # Calculate optical flow
# new_position, status, error = cv2.calcOpticalFlowPyrLK(prev_frame, current_frame.image, prev_ball_position, None)

# if status[0][0] == 1: # Check if the flow was found
# current_frame.ball.box.update_center(new_position[0][0])

# # Update the previous frame and ball position
# prev_frame = current_frame.image.copy()
# prev_ball_position = current_ball_position.copy()
# tried using acceleration, didn't work
# def calculate_acceleration(self):
# for i in range(2, len(self.state.frames)):
# frame0 = self.state.frames[i - 2]
# frame1 = self.state.frames[i - 1]
# frame2 = self.state.frames[i]

# if frame0.ball and frame1.ball and frame2.ball:
# vx1 = frame1.ball.vx
# vy1 = frame1.ball.vy

# vx2 = frame2.ball.vx
# vy2 = frame2.ball.vy

# time_diff = 1 / self.fps
# time_diff = 1

# ax = (vx2 - vx1) / time_diff
# ay = (vy2 - vy1) / time_diff
# # print(f"Frame {i}: Acceleration - ax: {ax:.2f}, ay: {ay:.2f}")

# frame2.ball.ax = ax
# frame2.ball.ay = ay

# def detect_abrupt_changes(self, acceleration_threshold=500):
# for i in range(len(self.state.frames)):
# frame = self.state.frames[i]
# if frame.ball and hasattr(frame.ball, 'ax') and hasattr(frame.ball, 'ay'):
# # Dynamic threshold
# dynamic_threshold = acceleration_threshold * (1 + (abs(frame.ball.vx) + abs(frame.ball.vy)) / 100)
# if abs(frame.ball.ax) > dynamic_threshold or abs(frame.ball.ay) > dynamic_threshold:
# frame.ball = None

def is_spatial_change_abrupt(self, current_frame, spatial_threshold=70, window_size=15):
if current_frame.ball:
x2, y2 = current_frame.ball.box.center()

# Look back up to 30 frames
for j in range(1, min(window_size + 1, current_frame.frameno)):
prev_frame = self.state.frames[current_frame.frameno - j]
if prev_frame.ball:
x1, y1 = prev_frame.ball.box.center()

distance = ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
if distance > spatial_threshold:
print(f"Frame {current_frame.frameno}: Abrupt spatial change detected. Distance: {distance:.2f}")
return True
break

return False

def process(self):
self.calculate_velocity()
self.calculate_acceleration()
for i in range(1, len(self.state.frames)):
if self.is_spatial_change_abrupt(self.state.frames[i - 1], self.state.frames[i]):
for i in range(len(self.state.frames)):
if self.is_spatial_change_abrupt(self.state.frames[i]):
self.state.frames[i].ball = None
# self.track_optical_flow()
self.detect_abrupt_changes()
self.estimate_missing_positions()
return self.state

0 comments on commit f04f39b

Please sign in to comment.