diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 46c3980e..a6c367e9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,4 +27,4 @@ jobs: run: echo $DATA >> /home/runner/work/Ball-101/Ball-101/.env - name: Run CI tests run: | - python src/processing/shot_detect.py + python src/main.py data/training_data.mp4 diff --git a/.gitignore b/.gitignore index 5dd8186b..a2d526c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ venv .env __pycache__ +.DS_Store +.vscode/ +tmp/ diff --git a/README.md b/README.md index 3f86b05f..05cb662a 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,7 @@ Enable AWS connection by pasting the .env file into the repo. Start the server backend by running ``` -cd src/api -uvicorn backend:app --reload +uvicorn src.api/.backend:app --reload ``` Open a new bash terminal and start the frontend by running diff --git a/data/stable_jerry.mp4 b/data/stable_jerry.mp4 new file mode 100644 index 00000000..1428678f Binary files /dev/null and b/data/stable_jerry.mp4 differ diff --git a/data/true_map.png b/data/true_map.png index 1cab50cc..952ddd2f 100644 Binary files a/data/true_map.png and b/data/true_map.png differ diff --git a/requirements.txt b/requirements.txt index 80ce400d..a947937e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -44,6 +44,9 @@ yapf isort==4.3.21 imageio +# Processing +scikit-learn + # View streamlit>=1.18.1 hydralit_components>= 1.0.10 diff --git a/src/main.py b/src/main.py index 67fbd10d..e0a07db6 100644 --- a/src/main.py +++ b/src/main.py @@ -4,12 +4,14 @@ import yaml from modelrunner import ModelRunner from processrunner import ProcessRunner + # before main is called: # frontend boots up, awaits user to upload a video # upload button triggers backend call to upload video to s3 # fetch the video from cloud and download to tmp/uploaded_video.mp4 # calls main + # load in configs from config.yaml # initialise modelrunner and processrunner # feed video into the yolo model, pass into modelrunner @@ -20,7 +22,7 @@ def load_config(path): """ TODO Loads the config yaml file to read in parameters and settings. """ - with open(path, 'r') as file: + with open(path, "r") as file: config = yaml.safe_load(file) return config @@ -31,21 +33,31 @@ def main(video_path): Input: Path of the user uploaded video. Returns: Results of the processing, in string format TODO change to csv/json? """ - config = load_config('config.yaml') - model_vars = config['model_vars'] + config = load_config("config.yaml") + model_vars = config["model_vars"] modelrunner = ModelRunner(video_path, model_vars) modelrunner.run() people_output, ball_output = modelrunner.fetch_output() - output_video_path = 'tmp/court_video.mp4' - output_video_path_reenc = 'tmp/court_video_reenc.mp4' + output_video_path = "tmp/court_video.mp4" + output_video_path_reenc = "tmp/court_video_reenc.mp4" - processrunner = ProcessRunner(video_path, people_output, ball_output, output_video_path, - output_video_path_reenc) + processrunner = ProcessRunner( + video_path, + people_output, + ball_output, + output_video_path, + output_video_path_reenc, + ) processrunner.run() results = processrunner.get_results() return results -if __name__ == '__main__': - main('data/training_data.mp4') +if __name__ == "__main__": + import sys + + if len(sys.argv) <= 1: + main("tmp/training_data.mp4") + else: + main(sys.argv[1]) # Pass the first command-line argument to the main function diff --git a/src/processing/courtline_detect.py b/src/processing/court.py similarity index 53% rename from src/processing/courtline_detect.py rename to src/processing/court.py index 4bc7aa13..3ee1f971 100644 --- a/src/processing/courtline_detect.py +++ b/src/processing/court.py @@ -1,6 +1,6 @@ -''' +""" Court Detection and Rendering Module -''' +""" import os import cv2 as cv import numpy as np @@ -8,8 +8,11 @@ class Bin: - 'Bin to store color ranges of two channels' - def __init__(self, value, one_lower:int, one_upper:int, two_lower:int, two_upper:int): + "Bin to store color ranges of two channels" + + def __init__( + self, value, one_lower: int, one_upper: int, two_lower: int, two_upper: int + ): self.one_lower = one_lower self.one_upper = one_upper self.two_lower = two_lower @@ -17,65 +20,72 @@ def __init__(self, value, one_lower:int, one_upper:int, two_lower:int, two_upper self.value = value def __str__(self): - return 'Bin('+str(round(self.value,3))+','+str(self.one_lower)+','+str(self.one_upper)+','+str(self.two_lower)+','+str(self.two_upper)+')' + return ( + "Bin(" + + str(round(self.value, 3)) + + "," + + str(self.one_lower) + + "," + + str(self.one_upper) + + "," + + str(self.two_lower) + + "," + + str(self.two_upper) + + ")" + ) class Render: - ''' + """ Object which, given video input and state data, will produce video court visualization of player positions. - ''' - def __init__(self, video_path:str, display_images:bool=False): - ''' + """ + + def __init__(self, video_path: str, display_images: bool = False): + """ Runs court detection on video input @param video_path is path from project root to video file @param display_images determines whether or not to display images for debugging - ''' - self._TRUE_PATH = os.path.join('data','true_map.png') + """ + self._TRUE_PATH = os.path.join("data", "true_map.png") self._VIDEO_PATH = video_path self._BINNING_THRESHOLD = 0.001 - 'Minimum percentage of pixels to be included in list of color bins' + "Minimum percentage of pixels to be included in list of color bins" self._COLOR_SMOOTHING = 5 - 'Padding to be added to selected HSV to include more color range' - self._HALF_COURT_BOUNDS = np.array([(2043,1920),(2043,35),(37,35),(37,1920)]) - 'Coordinates of four corners of truth map counterclock starting from bottom right' - self._BOX_BOUNDS = np.array([(1277,798),(1277,35),(803,35),(803,798)]) - 'Coordinates of four corners of inner box counterclock starting from bottom right' + "Padding to be added to selected HSV to include more color range" + self._HALF_COURT_BOUNDS = np.array( + [(1022, 962), (1022, 18), (19, 18), (19, 962)] + ) + "Coordinates of four corners of truth map counterclock starting from bottom right" + self._BOX_BOUNDS = np.array([(639, 398), (639, 17), (402, 17), (402, 398)]) + "Coordinates of four corners of inner box counterclock starting from bottom right" video = cv.VideoCapture(self._VIDEO_PATH) _, self._BGR_COURT = video.read() - self._YCRCB_COURT = cv.cvtColor(self._BGR_COURT,cv.COLOR_BGR2YCrCb) - 'User court in YCrCb color space' + self._YCRCB_COURT = cv.cvtColor(self._BGR_COURT, cv.COLOR_BGR2YCrCb) + "User court in YCrCb color space" self._HSV_COURT = cv.cvtColor(self._BGR_COURT, cv.COLOR_BGR2HSV) - 'User court in HSV color space' + "User court in HSV color space" self._GRAY_COURT = cv.cvtColor(self._BGR_COURT, cv.COLOR_BGR2GRAY) - 'User court in gray scale' - self._MASK_COURT_EDGES = self._GRAY_COURT.copy() # temp assignment - 'Final processed image of court edges' - self._TRUTH_COURT_MAP = cv.imread(self._TRUE_PATH,cv.IMREAD_GRAYSCALE) - 'True court map of half court' - + "User court in gray scale" + self._MASK_COURT_EDGES = self._GRAY_COURT.copy() # temp assignment + "Final processed image of court edges" + self._TRUTH_COURT_MAP = cv.imread(self._TRUE_PATH, cv.IMREAD_GRAYSCALE) + "True court map of half court" - # Downsample truth map - height, width = self._TRUTH_COURT_MAP.shape[:2] - new_height = int(height/2) - new_width = int(width/2) - self._BOX_BOUNDS = self._BOX_BOUNDS/2 - self._TRUTH_COURT_MAP = cv.resize(self._TRUTH_COURT_MAP, (new_width, new_height)) - - self._HSV_BINNING = True # choose either HSV binning or YCrCb binning + self._HSV_BINNING = True # choose either HSV binning or YCrCb binning if self._HSV_BINNING: - self._index = (0,1) + self._index = (0, 1) self._one_max = 180.0 self._two_max = 256.0 self._COURT_IMG = self._HSV_COURT else: - self._index = (1,2) + self._index = (1, 2) self._one_max = 256.0 self._two_max = 256.0 self._COURT_IMG = self._YCRCB_COURT self._HOMOGRAPHY = None - 'Homography matrix to transform court to minimaped version' + "Homography matrix to transform court to minimaped version" if display_images: self._HOMOGRAPHY = self._detect_courtlines_and_display() @@ -85,15 +95,14 @@ def __init__(self, video_path:str, display_images:bool=False): def get_homography(self): return self._HOMOGRAPHY - def _detect_courtlines(self): - 'Finds best homography' + "Finds best homography" bins = self._bin_pixels(self._COURT_IMG, one_bins=18, two_bins=10) mask = self._get_mask(self._COURT_IMG, bins[0]) canny_edges = self._get_canny(self._GRAY_COURT) masked_edges = self._apply_mask(canny_edges, mask) - hough_lines = self._get_hough(masked_edges,threshold=180) - thick_masked_edges = self._thicken_edges(masked_edges,iterations=1) + hough_lines = self._get_hough(masked_edges, threshold=180) + thick_masked_edges = self._thicken_edges(masked_edges, iterations=1) self._MASK_COURT_EDGES = thick_masked_edges.copy() best_pts = self._find_best_homography(hough_lines) while True: @@ -102,283 +111,329 @@ def _detect_courtlines(self): while True: if not self._fine_regress_box_boundary(best_pts): break - homography, _ = cv.findHomography(np.array(best_pts),self._BOX_BOUNDS) + homography, _ = cv.findHomography(np.array(best_pts), self._BOX_BOUNDS) return homography def _detect_courtlines_and_display(self): - 'Finds best homography and displays images of progress' + "Finds best homography and displays images of progress" bins = self._bin_pixels(self._COURT_IMG, one_bins=18, two_bins=10) mask = self._get_mask(self._COURT_IMG, bins[0]) canny_edges = self._get_canny(self._GRAY_COURT) masked_edges = self._apply_mask(canny_edges, mask) masked_bgr = self._apply_mask(self._BGR_COURT, mask) - hough_lines = self._get_hough(masked_edges,threshold=180) + hough_lines = self._get_hough(masked_edges, threshold=180) hough = self._apply_hough(self._BGR_COURT, hough_lines) - thick_masked_edges = self._thicken_edges(masked_edges,iterations=1) + thick_masked_edges = self._thicken_edges(masked_edges, iterations=1) self._MASK_COURT_EDGES = thick_masked_edges.copy() best_pts = self._find_best_homography(hough_lines) while self._regress_box_boundary(best_pts): - print('new goodness',self._evaluate_homography(best_pts,self._BOX_BOUNDS)) - print('to fine tuning') + print("new goodness", self._evaluate_homography(best_pts, self._BOX_BOUNDS)) + print("to fine tuning") while self._fine_regress_box_boundary(best_pts): - print('new goodness',self._evaluate_homography(best_pts,self._BOX_BOUNDS)) - homography, _ = cv.findHomography(np.array(best_pts),self._BOX_BOUNDS) + print("new goodness", self._evaluate_homography(best_pts, self._BOX_BOUNDS)) + homography, _ = cv.findHomography(np.array(best_pts), self._BOX_BOUNDS) test_bgr = self._BGR_COURT.copy() - color_map = cv.cvtColor(self._TRUTH_COURT_MAP,cv.COLOR_GRAY2BGR) - colors = [(0,0,255),(0,255,0),(255,0,0), (255,255,0)] - for i in range(0,4): + color_map = cv.cvtColor(self._TRUTH_COURT_MAP, cv.COLOR_GRAY2BGR) + colors = [(0, 0, 255), (0, 255, 0), (255, 0, 0), (255, 255, 0)] + for i in range(0, 4): pt = best_pts[i] - cv.circle(test_bgr,(int(pt[0]),int(pt[1])), 5, colors[i], -1) + cv.circle(test_bgr, (int(pt[0]), int(pt[1])), 5, colors[i], -1) pt = self._BOX_BOUNDS[i] - cv.circle(color_map,(int(pt[0]),int(pt[1])), 10, colors[i], -1) - new_img = self._apply_bgr_homography(self._BGR_COURT,best_pts) - new_gray_img = self._apply_gray_homography(self._MASK_COURT_EDGES,best_pts,or_mask=True) - second_gray_img = self._apply_gray_homography(self._MASK_COURT_EDGES,best_pts,or_mask=False) - - - cv.imshow('original', self._BGR_COURT) - cv.imshow('mask', mask) - cv.imshow('canny', canny_edges) - cv.imshow('canny masked', masked_edges) - cv.imshow('bgr masked', masked_bgr) - cv.imshow('hough transform', hough) - cv.imshow('new test', new_img) - cv.imshow('gray union', new_gray_img) - cv.imshow('gray intersection', second_gray_img) - cv.imshow('points image', test_bgr) - cv.imshow('true map',color_map) + cv.circle(color_map, (int(pt[0]), int(pt[1])), 10, colors[i], -1) + new_img = self._apply_bgr_homography(self._BGR_COURT, best_pts) + new_gray_img = self._apply_gray_homography( + self._MASK_COURT_EDGES, best_pts, or_mask=True + ) + second_gray_img = self._apply_gray_homography( + self._MASK_COURT_EDGES, best_pts, or_mask=False + ) + + cv.imshow("original", self._BGR_COURT) + cv.imshow("mask", mask) + cv.imshow("canny", canny_edges) + cv.imshow("canny masked", masked_edges) + cv.imshow("bgr masked", masked_bgr) + cv.imshow("hough transform", hough) + cv.imshow("new test", new_img) + cv.imshow("gray union", new_gray_img) + cv.imshow("gray intersection", second_gray_img) + cv.imshow("points image", test_bgr) + cv.imshow("true map", color_map) # self._test_many_bins(self._COURT_IMG,self._BGR_COURT,bins,iterations=6) - if cv.waitKey(0) & 0xff == 27: + if cv.waitKey(0) & 0xFF == 27: cv.destroyAllWindows() return homography - def _bin_pixels(self,img:np.ndarray, one_bins:int=16, two_bins:int=16): - ''' + def _bin_pixels(self, img: np.ndarray, one_bins: int = 16, two_bins: int = 16): + """ Returns top bins from YCrCb color space @Param: img, image of court, either HSV or YCrCb depending on settings @param one_bins, number of bins of first channel @param two_bins, number of bins of second channel @returns: sorted array of most frequent color bin objects - ''' + """ # generate weights - weights = np.zeros((img.shape[0],img.shape[1])) + weights = np.zeros((img.shape[0], img.shape[1])) for row in range(weights.shape[0]): row_weight = np.full(weights.shape[1], row) weights[row] = row_weight for col in range(weights.shape[1]): - col_weight = np.full(weights.shape[0], min(col+1,weights.shape[1]-col)) - weights[:,col] = np.minimum(weights[:,col],col_weight) - weights = weights/np.sum(weights) + col_weight = np.full(weights.shape[0], min(col + 1, weights.shape[1] - col)) + weights[:, col] = np.minimum(weights[:, col], col_weight) + weights = weights / np.sum(weights) # split image pixels into bins - bins = np.zeros((one_bins,two_bins)) + bins = np.zeros((one_bins, two_bins)) one_step = self._one_max / one_bins two_step = self._two_max / two_bins for row in range(img.shape[0]): for col in range(img.shape[1]): - pix = img[row,col] - bins[int(pix[self._index[0]]/one_step), - int(pix[self._index[1]]/two_step)] += weights[row,col] + pix = img[row, col] + bins[ + int(pix[self._index[0]] / one_step), + int(pix[self._index[1]] / two_step), + ] += weights[row, col] # sort bins top_bins = [] for row in range(bins.shape[0]): for col in range(bins.shape[1]): - if (bins[row,col] <= self._BINNING_THRESHOLD): + if bins[row, col] <= self._BINNING_THRESHOLD: continue - top_bins.append(Bin(bins[row,col], - int(round(one_step*row)), - int(round(one_step*(row+1))), - int(round(two_step*col)), - int(round(two_step*(col+1))))) + top_bins.append( + Bin( + bins[row, col], + int(round(one_step * row)), + int(round(one_step * (row + 1))), + int(round(two_step * col)), + int(round(two_step * (col + 1))), + ) + ) return sorted(top_bins, reverse=True, key=lambda bin: bin.value) - def _get_mask(self,img:np.ndarray, bin:Bin, morph:bool=True): - ''' + def _get_mask(self, img: np.ndarray, bin: Bin, morph: bool = True): + """ Applies color range from specified bin @param img, image of court in same color space from Bin @param bin, bin of colors to mask image over @param morph, whether or not to process image to eliminate noise and holes @returns masked image - ''' + """ # get mask - lowerbound = np.full(3,0) - upperbound = np.full(3,255) - lowerbound[self._index[0]] = bin.one_lower - self._COLOR_SMOOTHING*1 - lowerbound[self._index[1]] = bin.two_lower - self._COLOR_SMOOTHING*1 - upperbound[self._index[0]] = bin.one_upper + self._COLOR_SMOOTHING*0 - upperbound[self._index[1]] = bin.two_upper + self._COLOR_SMOOTHING*0 + lowerbound = np.full(3, 0) + upperbound = np.full(3, 255) + lowerbound[self._index[0]] = bin.one_lower - self._COLOR_SMOOTHING * 1 + lowerbound[self._index[1]] = bin.two_lower - self._COLOR_SMOOTHING * 1 + upperbound[self._index[0]] = bin.one_upper + self._COLOR_SMOOTHING * 0 + upperbound[self._index[1]] = bin.two_upper + self._COLOR_SMOOTHING * 0 mask = cv.inRange(img, lowerbound, upperbound) # close and open mask if morph: - kernel = np.ones((3,3),np.uint8) - mask = cv.morphologyEx(mask,cv.MORPH_CLOSE,kernel,iterations=8) #get all court - mask = cv.morphologyEx(mask,cv.MORPH_OPEN,kernel,iterations=30) #remove distractions + kernel = np.ones((3, 3), np.uint8) + mask = cv.morphologyEx( + mask, cv.MORPH_CLOSE, kernel, iterations=8 + ) # get all court + mask = cv.morphologyEx( + mask, cv.MORPH_OPEN, kernel, iterations=30 + ) # remove distractions return mask - def _apply_mask(self,img:np.ndarray, mask:np.ndarray): - ''' + def _apply_mask(self, img: np.ndarray, mask: np.ndarray): + """ Applies bitwise and mask @param img, image to mask over @param mask, mask, should be in grayscale @returns masked image - ''' - return cv.bitwise_and(img,img,mask=mask) + """ + return cv.bitwise_and(img, img, mask=mask) - def _get_canny(self,img:np.ndarray, threshold1:int=10, threshold2:int=100): - ''' + def _get_canny(self, img: np.ndarray, threshold1: int = 10, threshold2: int = 100): + """ Applies Canny edge detection to image @param img, image of court in grayscale @param threshold1, threshold2, thresholds for Canny @return image of court edges - ''' - return cv.Canny(img,threshold1,threshold2) + """ + return cv.Canny(img, threshold1, threshold2) - def _thicken_edges(self,img:np.ndarray, iterations:int=2): - ''' + def _thicken_edges(self, img: np.ndarray, iterations: int = 2): + """ Thickens the edges in an image @param img, grayscale image of court edges @param iterations, severity of thickness @returns image with thicker court edges - ''' - kernel = np.ones((3,3),np.uint8) - return cv.morphologyEx(img,cv.MORPH_DILATE,kernel,iterations=iterations) - - def _get_hough(self,img:np.ndarray,rho:float=1,theta:float=np.pi/180,threshold:int=200): - ''' + """ + kernel = np.ones((3, 3), np.uint8) + return cv.morphologyEx(img, cv.MORPH_DILATE, kernel, iterations=iterations) + + def _get_hough( + self, + img: np.ndarray, + rho: float = 1, + theta: float = np.pi / 180, + threshold: int = 200, + ): + """ Performs hough transform on image @param img, 8-bit grayscale image of court edges @returns list of lines detected given as rho and theta - ''' + """ return cv.HoughLines(img, rho, theta, threshold) - def _find_best_homography(self,lines:list): - ''' + def _find_best_homography(self, lines: list): + """ Finds best homography given list of lines @param lines, list of lines given by Hough Transform @returns list of four points corresponding to box boundary - ''' + """ # divide into two classes of lines, sorted by rho # TODO: sort lines using k-means, k=2 - hor = lines[lines[:,0,1]<=np.pi/2] # baseline - ver = lines[lines[:,0,1]>np.pi/2] # sideline - hor = np.array(sorted(hor, key = lambda x : x[0][0])) - ver = np.array(sorted(ver, key = lambda x : x[0][0])) - return self._iterate_best_homography(hor,ver) - - def _iterate_best_homography(self,hor:list,ver:list,relax_factor=0): - ''' + hor = lines[lines[:, 0, 1] <= np.pi / 2] # baseline + ver = lines[lines[:, 0, 1] > np.pi / 2] # sideline + hor = np.array(sorted(hor, key=lambda x: x[0][0])) + ver = np.array(sorted(ver, key=lambda x: x[0][0])) + return self._iterate_best_homography(hor, ver) + + def _iterate_best_homography(self, hor: list, ver: list, relax_factor=0): + """ Iterates until best homography is found @param hor, ver, lists of lines grouped into two categories, likely to be lines parallel to each other. @param relax_factor, how much to relax constraints on homography points. @return four points, or None if no homography is found. - ''' + """ max_goodness = 0 max_homography = None - for i1 in range(0,len(hor)): - for i2 in range(i1+1,len(hor)): + for i1 in range(0, len(hor)): + for i2 in range(i1 + 1, len(hor)): for j1 in range(len(ver)): - for j2 in range(j1+1,len(ver)): - pts = self._get_four_intersections(hor[i2][0],ver[j1][0],hor[i1][0], - ver[j2][0],relax_factor=relax_factor) + for j2 in range(j1 + 1, len(ver)): + pts = self._get_four_intersections( + hor[i2][0], + ver[j1][0], + hor[i1][0], + ver[j2][0], + relax_factor=relax_factor, + ) if pts is None: continue - goodness = self._evaluate_homography(pts,self._BOX_BOUNDS) - if (goodness > max_goodness): + goodness = self._evaluate_homography(pts, self._BOX_BOUNDS) + if goodness > max_goodness: max_goodness = goodness max_homography = pts - pts = self._get_four_intersections(ver[j2][0],hor[i2][0],ver[j1][0], - hor[i1][0],relax_factor=relax_factor) + pts = self._get_four_intersections( + ver[j2][0], + hor[i2][0], + ver[j1][0], + hor[i1][0], + relax_factor=relax_factor, + ) if pts is None: continue - goodness = self._evaluate_homography(pts,self._BOX_BOUNDS) - if (goodness > max_goodness): + goodness = self._evaluate_homography(pts, self._BOX_BOUNDS) + if goodness > max_goodness: max_goodness = goodness max_homography = pts if max_homography is None: if relax_factor > 2: return None - return self._iterate_best_homography(hor,ver,relax_factor=relax_factor+0.25) + return self._iterate_best_homography( + hor, ver, relax_factor=relax_factor + 0.25 + ) return max_homography - def _evaluate_homography(self,pts_src:list,pts_dst:list): - ''' + def _evaluate_homography(self, pts_src: list, pts_dst: list): + """ Evalues how well a homography performs on court @param pts_src, four points on court image of box, counterclockwise @param pts_src, four points on true image of court to map toz @return goodness, proportion of intersection. - ''' - assert(pts_src is not None) - mapped_edge_img = self._apply_gray_homography(self._MASK_COURT_EDGES,pts_src,pts_dst=pts_dst) - total_max_overlap = self._max_pixel_overlap(self._MASK_COURT_EDGES,pts_src,pts_dst=pts_dst) + """ + assert pts_src is not None + mapped_edge_img = self._apply_gray_homography( + self._MASK_COURT_EDGES, pts_src, pts_dst=pts_dst + ) + total_max_overlap = self._max_pixel_overlap( + self._MASK_COURT_EDGES, pts_src, pts_dst=pts_dst + ) goodness = float(np.count_nonzero(mapped_edge_img > 100)) / total_max_overlap return goodness - def _get_four_intersections(self,l1:list,l2:list,l3:list,l4:list,relax_factor=0): - ''' + def _get_four_intersections( + self, l1: list, l2: list, l3: list, l4: list, relax_factor=0 + ): + """ Gets four points from intersection of four lines. @param l1, line by free throw line @param l2, l3, l4, lines going counterclockwise around box @param relax_factor, how much to relax restriction for image size @returns four points going counterclockwise, or None is points are not valid - ''' - p1 = self._get_line_intersection(l1,l2) - p2 = self._get_line_intersection(l2,l3) - p3 = self._get_line_intersection(l3,l4) - p4 = self._get_line_intersection(l4,l1) - d1 = self._distance(p1,p2) - d2 = self._distance(p2,p3) - d3 = self._distance(p3,p4) - d4 = self._distance(p4,p1) - relax = 1000*relax_factor - if (d1<600-relax or d1>800+relax or d3<600-relax or d3>800+relax or d2<50 or - d2>300+relax or d4<50 or d4>300+relax or self._is_not_convex(p1,p2,p3,p4)): + """ + p1 = self._get_line_intersection(l1, l2) + p2 = self._get_line_intersection(l2, l3) + p3 = self._get_line_intersection(l3, l4) + p4 = self._get_line_intersection(l4, l1) + d1 = self._distance(p1, p2) + d2 = self._distance(p2, p3) + d3 = self._distance(p3, p4) + d4 = self._distance(p4, p1) + relax = 1000 * relax_factor + if ( + d1 < 600 - relax + or d1 > 800 + relax + or d3 < 600 - relax + or d3 > 800 + relax + or d2 < 50 + or d2 > 300 + relax + or d4 < 50 + or d4 > 300 + relax + or self._is_not_convex(p1, p2, p3, p4) + ): return None - return (p1,p2,p3,p4) + return (p1, p2, p3, p4) - def _get_line_intersection(self,line1:list,line2:list): - ''' + def _get_line_intersection(self, line1: list, line2: list): + """ Gets intersection of two lines. @param line1, line2, lines given in form (rho,theta) @return point (x,y) of their intersection, or (0,0) if parallel - ''' + """ rho1, theta1 = line1 rho2, theta2 = line2 a1 = np.cos(theta1) a2 = np.sin(theta1) b1 = np.cos(theta2) b2 = np.sin(theta2) - d = a1*b2 - a2*b1 - if d==0: - return (0,0) - x = (rho1*b2-rho2*a2) / d - y = (-rho1*b1+rho2*a1) / d - return (x,y) - - def _distance(self,pt1:list,pt2:list): - ''' + d = a1 * b2 - a2 * b1 + if d == 0: + return (0, 0) + x = (rho1 * b2 - rho2 * a2) / d + y = (-rho1 * b1 + rho2 * a1) / d + return (x, y) + + def _distance(self, pt1: list, pt2: list): + """ @param pt1, pt2, points given as (x,y) in pixels @returns Euclidean distnace between points - ''' - return ((pt1[0]-pt2[0])**2 + (pt1[1]-pt2[1])**2)**0.5 + """ + return ((pt1[0] - pt2[0]) ** 2 + (pt1[1] - pt2[1]) ** 2) ** 0.5 - def _is_not_convex(self,*pts:list): - ''' + def _is_not_convex(self, *pts: list): + """ Checks if set of points is convex @param *pts, set of points given in order and as (x,y) @returns True, if not convex, False if convex - ''' + """ N = len(pts) prev, curr = 0, 0 for i in range(N): - temp = [pts[i],pts[(i+1)%N],pts[(i+2)%N]] + temp = [pts[i], pts[(i + 1) % N], pts[(i + 2) % N]] curr = self._cross_product(temp) if curr != 0: if curr * prev < 0: @@ -387,42 +442,42 @@ def _is_not_convex(self,*pts:list): prev = curr return False - def _cross_product(self,A:list): - ''' + def _cross_product(self, A: list): + """ @param A, list of 3 vectors of dim 2 @returns cross product of 2 vectors - ''' - X1 = (A[1][0] - A[0][0]) - Y1 = (A[1][1] - A[0][1]) - X2 = (A[2][0] - A[0][0]) - Y2 = (A[2][1] - A[0][1]) - return (X1 * Y2 - Y1 * X2) - - def _regress_box_boundary(self,pts_src,delta=range(1,40)): - ''' + """ + X1 = A[1][0] - A[0][0] + Y1 = A[1][1] - A[0][1] + X2 = A[2][0] - A[0][0] + Y2 = A[2][1] - A[0][1] + return X1 * Y2 - Y1 * X2 + + def _regress_box_boundary(self, pts_src, delta=range(1, 40)): + """ Adjust box boundary to get better homography @param pts_src, list of four points of box boundary, counterclockwise @param delta, amount to change court boundary @returns True, if box boundary was adjusted, and False otherwise - ''' + """ if pts_src is None: return False - prev_good = self._evaluate_homography(pts_src,self._BOX_BOUNDS) + prev_good = self._evaluate_homography(pts_src, self._BOX_BOUNDS) box_bounds = [] for i in [0]: - for j in [-1,1]: + for j in [-1, 1]: for d in delta: # copy = self.BOX_BOUNDARY.copy() # copy[i:i+2,0] += delta*j # box_bounds.append(copy) copy = self._BOX_BOUNDS.copy() - copy[[i-1,i],1] += d*j + copy[[i - 1, i], 1] += d * j box_bounds.append(copy) max_good = 0 max_index = 0 for i in range(len(box_bounds)): - good = self._evaluate_homography(pts_src,box_bounds[i]) + good = self._evaluate_homography(pts_src, box_bounds[i]) if good > max_good: max_good = good max_index = i @@ -433,122 +488,125 @@ def _regress_box_boundary(self,pts_src,delta=range(1,40)): self._BOX_BOUNDS = box_bounds[max_index] return True - def _fine_regress_box_boundary(self,pts_src,delta=range(1,5)): - ''' + def _fine_regress_box_boundary(self, pts_src, delta=range(1, 5)): + """ Finely adjust box boundary to get better homography @param pts_src, list of four points of box boundary, counterclockwise @param delta, amount to change court boundary @returns True, if box boundary was adjusted, and False otherwise - ''' + """ if pts_src is None: return False - prev_good = self._evaluate_homography(pts_src,self._BOX_BOUNDS) + prev_good = self._evaluate_homography(pts_src, self._BOX_BOUNDS) box_bounds = [] - for i in [0,1,2,3]: - for j in [0,1]: + for i in [0, 1, 2, 3]: + for j in [0, 1]: epsilon = random.random() - for k in [-epsilon,epsilon]: + for k in [-epsilon, epsilon]: for d in delta: copy = self._BOX_BOUNDS.copy() - copy[i,j] += d*k + copy[i, j] += d * k box_bounds.append(copy) max_good = 0 max_index = 0 for i in range(len(box_bounds)): - good = self._evaluate_homography(pts_src,box_bounds[i]) + good = self._evaluate_homography(pts_src, box_bounds[i]) if good > max_good: max_good = good max_index = i - if max_good <= prev_good*1.00001: + if max_good <= prev_good * 1.00001: return False else: self._BOX_BOUNDS = box_bounds[max_index] return True - def _apply_hough(self,img:np.ndarray, lines:list): - ''' + def _apply_hough(self, img: np.ndarray, lines: list): + """ Draws lines from hough transformation onto image @param img, bgr image of court @param lines, list of lines given by Hough Transform @returns image with lines drawn on - ''' + """ out = img.copy() for line in lines: rho, theta = line[0] a, b = np.cos(theta), np.sin(theta) - x0, y0 = a*rho, b*rho - x1, y1 = int(x0 + 2000*(-b)), int(y0 + 2000*(a)) - x2, y2 = int(x0 - 2000*(-b)), int(y0 - 2000*(a)) - cv.line(out,(x1,y1),(x2,y2),[0,0,255]) + x0, y0 = a * rho, b * rho + x1, y1 = int(x0 + 2000 * (-b)), int(y0 + 2000 * (a)) + x2, y2 = int(x0 - 2000 * (-b)), int(y0 - 2000 * (a)) + cv.line(out, (x1, y1), (x2, y2), [0, 0, 255]) return out - - def _apply_gray_homography(self,im_src:np.ndarray, pts_src:list, pts_dst=None, or_mask=False): - ''' + def _apply_gray_homography( + self, im_src: np.ndarray, pts_src: list, pts_dst=None, or_mask=False + ): + """ Return warped image given list of four pts @Preconditions: im_src is grayscale image of masked edges src_pts: list of fours (x,y)* starting at back right corner of box and looping around counterclockwise or_mask: lets us see all parts of both truth map and homographied image - ''' + """ im_dst = self._TRUTH_COURT_MAP.copy() if pts_dst is None: pts_dst = self._BOX_BOUNDS pts_src = np.array(pts_src) - h, _ = cv.findHomography(pts_src,pts_dst) - im_out = cv.warpPerspective(im_src, h, (im_dst.shape[1],im_dst.shape[0])) + h, _ = cv.findHomography(pts_src, pts_dst) + im_out = cv.warpPerspective(im_src, h, (im_dst.shape[1], im_dst.shape[0])) if or_mask: - return cv.bitwise_or(im_out,self._invert_grayscale(im_dst)) + return cv.bitwise_or(im_out, self._invert_grayscale(im_dst)) else: - return cv.bitwise_and(im_out,self._invert_grayscale(im_dst)) - - def _max_pixel_overlap(self,im_src:np.ndarray, pts_src:list, pts_dst=None): - ''' + return cv.bitwise_and(im_out, self._invert_grayscale(im_dst)) + + def _max_pixel_overlap(self, im_src: np.ndarray, pts_src: list, pts_dst=None): + """ Returns max number of pixels homography can obtain given camera viewport @Preconditions: im_src is grayscale image of masked edges src_pts: list of fours (x,y)* starting at back right corner of box and looping around counterclockwise - ''' - all_white = np.full_like(im_src,255) - max_overlap = self._apply_gray_homography(all_white,pts_src,pts_dst=pts_dst) + """ + all_white = np.full_like(im_src, 255) + max_overlap = self._apply_gray_homography(all_white, pts_src, pts_dst=pts_dst) return np.count_nonzero(max_overlap > 100) - def _apply_bgr_homography(self,im_src:np.ndarray, pts_src:list): - ''' + def _apply_bgr_homography(self, im_src: np.ndarray, pts_src: list): + """ Return warped bgr image given list of four pts @Preconditions: im_src is bgr image of court src_pts: list of fours (x,y)* starting at back right corner of box and looping around counterclockwise - ''' + """ im_dst = self._TRUTH_COURT_MAP.copy() pts_dst = self._BOX_BOUNDS pts_src = np.array(pts_src) - h, _ = cv.findHomography(pts_src,pts_dst) - im_out = cv.warpPerspective(im_src, h, (im_dst.shape[1],im_dst.shape[0])) - return cv.bitwise_or(im_out,cv.cvtColor(self._invert_grayscale(im_dst),cv.COLOR_GRAY2BGR)) - - def _convert_to_hsv(self,img_path:str, img:np.ndarray=None): - ''' + h, _ = cv.findHomography(pts_src, pts_dst) + im_out = cv.warpPerspective(im_src, h, (im_dst.shape[1], im_dst.shape[0])) + return cv.bitwise_or( + im_out, cv.cvtColor(self._invert_grayscale(im_dst), cv.COLOR_GRAY2BGR) + ) + + def _convert_to_hsv(self, img_path: str, img: np.ndarray = None): + """ Helper function converts bgr image to HSV and separates into channels @param img_path, file path to source image @param img, bgr image of court if already read in @returns nothing, just displays separated color channels - ''' + """ if img is None: img = cv.imread(img_path) - hsv = cv.cvtColor(img,cv.COLOR_BGR2HSV) + hsv = cv.cvtColor(img, cv.COLOR_BGR2HSV) hue_only = hsv.copy() - hue_only[:,:,1] = 255 - hue_only[:,:,2] = 255 + hue_only[:, :, 1] = 255 + hue_only[:, :, 2] = 255 sat_only = hsv.copy() - sat_only[:,:,0] = 0 - sat_only[:,:,2] = 255 + sat_only[:, :, 0] = 0 + sat_only[:, :, 2] = 255 val_only = hsv.copy() - val_only[:,:,0] = 0 - val_only[:,:,1] = 255 + val_only[:, :, 0] = 0 + val_only[:, :, 1] = 255 hue_only = cv.cvtColor(hue_only, cv.COLOR_HSV2BGR) sat_only = cv.cvtColor(sat_only, cv.COLOR_HSV2BGR) @@ -559,29 +617,29 @@ def _convert_to_hsv(self,img_path:str, img:np.ndarray=None): cv.imshow("Saturation Only", sat_only) cv.imshow("Value Only", val_only) - def _convert_to_ycrcb(self,img_path:str, img:np.ndarray=None): - ''' + def _convert_to_ycrcb(self, img_path: str, img: np.ndarray = None): + """ Helper function converts bgr image to YCrCb and separates into channels @param img_path, file path to source image @param img, bgr image of court if already read in @returns nothing, just displays separated color channels - ''' + """ if img is None: img = cv.imread(img_path) - ycrcb = cv.cvtColor(img,cv.COLOR_BGR2YCrCb) + ycrcb = cv.cvtColor(img, cv.COLOR_BGR2YCrCb) luma_only = ycrcb.copy() - luma_only[:,:,1] = 128 - luma_only[:,:,2] = 128 + luma_only[:, :, 1] = 128 + luma_only[:, :, 2] = 128 blue_only = ycrcb.copy() - blue_only[:,:,0] = 0 - blue_only[:,:,2] = 128 + blue_only[:, :, 0] = 0 + blue_only[:, :, 2] = 128 red_only = ycrcb.copy() - red_only[:,:,0] = 0 - red_only[:,:,1] = 128 + red_only[:, :, 0] = 0 + red_only[:, :, 1] = 128 luma_only = cv.cvtColor(luma_only, cv.COLOR_YCrCb2BGR) blue_only = cv.cvtColor(blue_only, cv.COLOR_YCrCb2BGR) @@ -592,65 +650,75 @@ def _convert_to_ycrcb(self,img_path:str, img:np.ndarray=None): cv.imshow("Blue Difference Only", blue_only) cv.imshow("Red Difference Only", red_only) - def _invert_grayscale(self,gray_img:np.ndarray): - ''' + def _invert_grayscale(self, gray_img: np.ndarray): + """ Inverts grayscale images @param gray_img, gray scale image to invert. @returns inverted grayscale image - ''' + """ ret = gray_img.copy() - ret[ret[:,:] >= 128] = 128 - ret[ret[:,:] < 128] = 255 - ret[ret[:,:] == 128] = 0 + ret[ret[:, :] >= 128] = 128 + ret[ret[:, :] < 128] = 255 + ret[ret[:, :] == 128] = 0 return ret - def _test_many_bins(self,hsv:np.ndarray, bgr:np.ndarray, bins:list, iterations:int=6): - ''' + def _test_many_bins( + self, hsv: np.ndarray, bgr: np.ndarray, bins: list, iterations: int = 6 + ): + """ Helper function to see what colors are in the bins @param hsv, hsv image of court @param bgr, bgr image of court @param bins, list of all color bins in order @param iterations, number of bins to test @returns nothing, just displays masking of bins on court image - ''' + """ for i in range(min(iterations, len(bins))): mask = self._get_mask(hsv, bins[i], morph=False) - masked = self._apply_mask(bgr,mask) - cv.imshow('Bin Level ' + str(i+1), masked) + masked = self._apply_mask(bgr, mask) + cv.imshow("Bin Level " + str(i + 1), masked) - def _test_many_canny(self,gray_img:np.ndarray, mask:np.ndarray, grid:list): - ''' + def _test_many_canny(self, gray_img: np.ndarray, mask: np.ndarray, grid: list): + """ Helper function to see different threshold levels of canny edge detection @param gray_img, grayscale image of court @param mask, mask over floor @param grid, list of tuples (one,two) for thresholds to test in canny @returns nothing, just display images of canny edges - ''' + """ for one in grid: for two in grid: if one > two: continue canny = self._get_canny(gray_img, threshold1=one, threshold2=two) masked_canny = self._apply_mask(canny, mask) - cv.imshow(str(one)+' by '+str(two), masked_canny) + cv.imshow(str(one) + " by " + str(two), masked_canny) - def _test_many_hough(self,gray_img:np.ndarray, canny:np.ndarray, grid:list): - ''' + def _test_many_hough(self, gray_img: np.ndarray, canny: np.ndarray, grid: list): + """ Helper function to test many hough lines of different thresholds @param gray_img, grayscale image of court to draw on @param canny, image of court edges @param grid, list of floats for threshold values for hough transform - ''' + """ for rho in grid[0]: for theta in grid[1]: for threshold in grid[2]: lines = self._get_hough(canny, rho, theta, threshold) hough = self._apply_hough(gray_img, lines) - cv.imshow(str(rho)+' by '+str(round(theta,3))+' by '+str(threshold), hough) - -if __name__ == '__main__': - video_path = os.path.join('data','training_data.mp4') - render = Render(video_path=video_path,display_images=True) + cv.imshow( + str(rho) + + " by " + + str(round(theta, 3)) + + " by " + + str(threshold), + hough, + ) + + +if __name__ == "__main__": + video_path = os.path.join("data", "training_data.mp4") + render = Render(video_path=video_path, display_images=True) # x,y = 800,460 # x1, y1 = render._transform_point(x,y) diff --git a/src/processing/general_detect.py b/src/processing/general_detect.py deleted file mode 100644 index 7777a9cb..00000000 --- a/src/processing/general_detect.py +++ /dev/null @@ -1,148 +0,0 @@ -from typing import Tuple, List, Dict -from state import BallState, GameState - -def parse_output(state: GameState, output_path: str): - """ - Parses the player output file into a list of dictionaries. - File is in the format: - Each line represents a frame and contains the following information: - , , , , - - Object types: 0 - ball, 1 - player, 2 - rim - Input: - state [GameState]: GameState object - output_path [str]: path to output file - Output: - rim_info [dict]: dictionary containing rim coordinates - frames [list]: list of dictionaries containing frame information - """ - frames = [] - rim_info = {} - with open(output_path, 'r') as file: - lines = file.readlines() - curr_frame = int(lines[0].split()[0]) - rim = True - frame_info = {"frameno": curr_frame, "players": {}} - for line in lines: - curr = line.split() - if curr_frame != curr[0]: - frames.append(frame_info) - frame_info = {"frameno": int(curr[0]), "players": {}} - curr_frame = curr[0] - if curr[1] == '1': - frame_info['players']['player' + curr[2]] = { - 'xmin': int(curr[3]), - 'ymin': int(curr[4]), - 'xmax': int(curr[3]) + int(curr[5]), - 'ymax': int(curr[4]) + int(curr[6]) - } - elif rim and curr[1] == '2': - rim_info = { - 'xmin': int(curr[3]), - 'ymin': int(curr[4]), - 'xmax': int(curr[3]) + int(curr[5]), - 'ymax': int(curr[4]) + int(curr[6]) - } - rim = False - frames.append(frame_info) - return rim_info, frames - - -def parse_ball(state: GameState, ball_out) -> None: - """ - Reads the ball output and updates state.states. - Input: - state [GameState]: GameState object - ball_out [str]: path to ball output file - """ - with open(ball_out, 'r') as file: - lines = file.readlines() - curr_frame = lines[0].split()[0] - idx = 0 - for i, frame in enumerate(state.states): - if frame.get("frameno") == int(curr_frame): - idx = i - break - frame_info = {} - for line in lines: - curr = line.split() - if curr_frame != curr[0]: - state.states[idx].update(frame_info) - for i in range(idx, len(state.states)): - if state.states[i].get("frameno") == int(curr[0]): - idx = i - break - frame_info = {} - curr_frame = curr[0] - if int(curr[5]) > 200 or int(curr[6]) > 200: - continue - frame_info['ball'] = { - 'xmin': int(curr[3]), - 'ymin': int(curr[4]), - 'xmax': int(curr[3]) + int(curr[5]), - 'ymax': int(curr[4]) + int(curr[6]) - } - return None - - -def player_passes(pos_lst) -> List[Tuple[int, int, int, int, int]]: - """ - Input: - pos_lst [list]: list of ball possession tuples - Output: - passes [list[tuple]]: Returns a list of passes with each pass - represented as a tuple of the form - (pass_id, passerid, receiverid, - start_frame, end_frame) - """ - passes = [] - curr_player = pos_lst[0][0] - curr_end_frame = pos_lst[0][2] - for i in range(1, len(pos_lst)): - curr_pass = (i, curr_player, pos_lst[i][0], curr_end_frame+1, - pos_lst[i][1]-1) - passes.append(curr_pass) - curr_player = pos_lst[i][0] - curr_end_frame = pos_lst[i][2] - return passes - - -def ball_state_update(pos_lst: list, lastframe) -> List[Tuple[int, int, BallState]]: - """ - Reads in a possession list and the last frame of the game - Returns: List[frame1, frame2, BallState] that partitions each interval of - frames by ballstate - """ - # [(start_frame, end_frame, BallState)] - ball_state = [] - if pos_lst[0][1] != 0: - ball_state.append((0, pos_lst[0][1]-1, BallState.OUT_OF_PLAY)) - ball_state.append( - (pos_lst[0][1], pos_lst[0][2], BallState.IN_POSSESSION)) - curr_frame = pos_lst[0][2]+1 - for i in range(1, len(pos_lst)): - ball_state.append( - (curr_frame, pos_lst[i][1]-1, BallState.IN_TRANSITION)) - ball_state.append( - (pos_lst[i][1], pos_lst[i][2], BallState.IN_POSSESSION)) - curr_frame = pos_lst[i][2]+1 - ball_state.append((curr_frame, lastframe, BallState.OUT_OF_PLAY)) - return ball_state - - -def player_possession(pos_lst) -> Dict[int, List[Tuple[int, int]]]: - """ - Input: - pos_lst [list]: list of ball possession tuples - Output: - player_pos {dict}: Returns a dictionary with the player id as the - key and a list of tuples of the form - (start_frame, end_frame) as the value - """ - player_pos = {} - for pos in pos_lst: - if pos[0] not in player_pos: - player_pos[pos[0]] = [(pos[1], pos[2])] - else: - player_pos[pos[0]].append((pos[1], pos[2])) - return player_pos diff --git a/src/processing/parse.py b/src/processing/parse.py new file mode 100644 index 00000000..29df3ca2 --- /dev/null +++ b/src/processing/parse.py @@ -0,0 +1,49 @@ +""" +Parsing module for parsing all +models outputs into the state +""" +from state import GameState, Frame, ObjectType + + +def parse_sort_output(state: GameState, sort_output) -> None: + """ + Reads the SORT output and updates state.states frame-by-frame. + Input: + state [GameState]: GameState object + sort_output [str]: path to SORT output file (right now for STRONGSORT) + Assumptions: + Frame numbers are shared between outpute files + If rim is not detected, the rim from the previous frame will be supplied + Object type number given in state.ObjectType + Based on StrongSORT output + """ + file = open(sort_output, "r") + lines = [[int(x) for x in line.split()] for line in file.readlines()] + file.close() + + sts = state.frames + b = 0 # index of line in ball + s = 0 # index of state + while b < len(lines): + frame, obj_type, id, xmin, ymin, xwidth, ywidth = lines[b][:7] + if s >= len(sts): # s-1 frameno < bframe, s = len(states) + sts.append(Frame(frame)) + elif frame < sts[s].frameno: # s-1 frameno < bframe < s frameno + sts.insert(s, Frame(frame)) + elif frame > sts[s].frameno: + if sts[s].rim is None and s > 0: + sts[s].rim = sts[s - 1].rim # ensure rim set + s += 1 + continue + + sF: Frame = sts[s] + assert sF.frameno == frame + box = (xmin, ymin, xmin + xwidth, ymin + ywidth) + if obj_type is ObjectType.BALL.value: + sF.set_ball_frame(id, *box) + elif obj_type is ObjectType.PLAYER.value: + sF.add_player_frame(id, *box) + elif obj_type is ObjectType.RIM.value: + sF.set_rim_box(id, *box) + + b += 1 # process next line diff --git a/src/processing/render.py b/src/processing/render.py new file mode 100644 index 00000000..425fd6d3 --- /dev/null +++ b/src/processing/render.py @@ -0,0 +1,159 @@ +""" +Video Rendering module for courtline detection and video reencoding. +""" +import cv2 as cv +import random +import os +import numpy as np +import subprocess +import sys + +from state import GameState + + +# pass in homo matrix + + +# implement video reencoding +class VideoRender: + def __init__(self, homography): + self._TRUE_PATH = os.path.join("data", "true_map.png") + self._TRUTH_COURT_MAP = cv.imread(self._TRUE_PATH, cv.IMREAD_GRAYSCALE) + self._HOMOGRAPHY = homography + + def reencode(self, input_path, output_path): + """ + Re-encodes a MPEG4 video file to H.264 format. Overrides existing output videos if present. + Deletes the unprocessed video when complete. + Ensures ffmpeg dependency is installed + """ + + if sys.platform != "darwin": + print("Designed to install dependency for macOS") + else: + try: + # check if it is installed already + subprocess.run( + ["brew", "list", "ffmpeg"], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + except: + # install dependency + print("Installing ffmpeg") + try: + subprocess.run( + ["brew", "install", "ffmpeg"], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + except: + print("Error installing ffmpeg") + return + + reencode_command = ( + f"ffmpeg -y -i {input_path} -vcodec libx264 -c:a copy {output_path}" + ) + os.system(reencode_command) + # os.remove(input_path) + + def render_video(self, state: GameState, filename: str, fps: int = 30): + """ + Takes into player position data, applied homography, + and renders video stored in filename + state: GameState with at least bounding boxes on it + filename: file path from project root where video is saved + fps: frames per second expected of produced video + """ + frames = state.frames + players = list(state.players.keys()) + # Create a blank image to use as the background for each frame + background = cv.cvtColor(self._TRUTH_COURT_MAP, cv.COLOR_GRAY2BGR) + height, width, _ = background.shape + + # Initialize the video writer + fourcc = cv.VideoWriter_fourcc(*"mp4v") + video_writer = cv.VideoWriter(filename, fourcc, fps, (width, height)) + + # Define initial positions for each player + player_state: dict[str, dict] = {} + for id in players: + player_state.update( + { + id: { + "pos": (0, 0), + "detected": False, + "color": ( + random.randint(0, 256), + random.randint(0, 256), + random.randint(0, 256), + ), + } + } + ) + + # find duration of video + dur = frames[-1].frameno + fi = 0 + # Loop through each time step + for t in range(dur + 1): + # Create a copy of the background image to draw the points on + frame = background.copy() + + # Get dictionary of positions at each frame + for id in player_state: + player_state.get(id).update({"detected": False}) # reset detection + while fi < len(frames) and frames[fi].frameno <= t: + f = frames[fi] + for id in players: # update pos for each player + if id in f.players: + b = f.players.get(id).box # get new player frame + x, y = (b.xmin + b.xmax) / 2.0, b.ymax + x, y = self._transform_point(x, y) + player_state.get(id).update({"pos": (x, y)}) + player_state.get(id).update({"detected": True}) + fi += 1 + + # Loop through each point and draw it on the frame + for id in players: + if not player_state.get(id).get("detected"): + continue + pos = player_state[id]["pos"] + pos = (int(pos[0]), int(pos[1])) + color = player_state[id]["color"] + font = cv.FONT_HERSHEY_SIMPLEX + thickness = 2 + font_scale = 1 + radius = 10 + text_width = cv.getTextSize(id, font, font_scale, thickness)[0][0] + cv.circle( + img=frame, center=pos, radius=radius, color=color, thickness=-1 + ) + cv.putText( + img=frame, + text=id, + org=(pos[0] - (text_width // 2), pos[1] - radius - 10), + fontFace=font, + fontScale=font_scale, + color=color, + thickness=thickness, + lineType=cv.LINE_AA, + ) + + # Write the frame to the video writer + video_writer.write(frame) + + # Release the video writer + video_writer.release() + + def _transform_point(self, x: float, y: float): + """ + Applies court homography to single point + @param x,y pixel positions of point on court video + @returns transformed pixels x,y positions on true court + """ + point = np.array([x, y], dtype=np.float32) + point = point.reshape((1, 1, 2)) + transformed_point = cv.perspectiveTransform(point, self._HOMOGRAPHY) + tx, ty = transformed_point[0, 0] + return tx, ty diff --git a/src/processing/shot.py b/src/processing/shot.py new file mode 100644 index 00000000..8bfe5a68 --- /dev/null +++ b/src/processing/shot.py @@ -0,0 +1,60 @@ +from state import GameState, ShotAttempt, Box, Interval + + +class ShotFrame: + def __init__(self): + self.top: bool = False + self.rim: bool = False + + +def detect_shot(state: GameState, inte: Interval, window: int) -> ShotAttempt: + """ + Returns a ShotAttempt analyzing frames along the interval + window: margin of error in frames from top to rim box + """ + sa = ShotAttempt(inte.playerid, inte.start, inte.end) + sfs: list[ShotFrame] = [] + for i in range(sa.start, sa.end + 1): # construct sfs of shot frames + rim = state.frames[i].rim + h = rim.ymax - rim.ymin + top = Box(rim.xmin, rim.ymin - h, rim.xmax, rim.ymax - h) # top rim box + ball = state.frames[i].ball + + sf = ShotFrame() + if ball.box.intersects(top): + sf.top = True + if rim.contains(ball.box): + sf.rim = True + sfs.append(sf) + + r = 0 # freq rim intersects in window + for i in range(len(sfs) - window): + if sfs[i + window].rim: + r += 1 # add to window + if sfs[i].top and r > 0: + sa.made = True # made shot detected + sa.frame = i + sa.start + if sfs[i].rim: + r -= 1 # remove from window + + return sa + + +def shots(state: GameState, window: int): + """ + Calculate shots throughout game + window: margin of error in frames from top to rim box + Assumption: + shots are counted as breaks between possesssions + """ + # Create intervals of shots + shots: list[Interval] = [] + poss = state.possessions + for i in range(len(poss) - 1): + p1 = poss[i] + p2 = poss[i + 1] + shots.append(Interval(p1.playerid, p1.end, p2.start)) + + for inte in shots: + sa: ShotAttempt = detect_shot(state, inte, window=window) + state.shots.append(sa) diff --git a/src/processing/shot_detect.py b/src/processing/shot_detect.py deleted file mode 100644 index 96aff6c5..00000000 --- a/src/processing/shot_detect.py +++ /dev/null @@ -1,108 +0,0 @@ -from typing import List, Tuple -def new_rim(file_path): - """ - TODO is this function used? - Accepts a strongsort text file path as an input, and outputs the coordinates - of the original rim and the new box above it. Assumption: the coordinates of - the rim do not move (constant) throughout the video. - """ - # Initialize values - left = 0 - top = 0 - width = 0 - height = 0 - with open(file_path, 'r') as file: - lines = file.readlines() - for line in lines: - # Convert each text file line into an int list - lst = [int(i) for i in line.split()] - - # Rim index = 2 - if lst[1] == 2: - left = lst[3] - top = lst[4] - width = lst[5] - height = lst[6] - - # The top_box is simply a height above the rim_box - top_box = left, top+height, width, height - rim_box = left, top, width, height - return top_box, rim_box - - -def madeshot(file_path) -> List[Tuple[int, int]]: - """ - Accepts a strongsort text file path as an input, and outputs the frame - intervals in which a shot was made. Ex: If a shot was made between the - frames 0 and 10, the output of madeshot() will be [(0, 10)]. - """ - # Initialize values - top, rim = new_rim(file_path) - shots_made = [] - passed_top = False - passed_rim = False - top_collision_start = 0 - top_collision_end = 0 - rim_collision_start = 0 - rim_collision_end = 0 - with open(file_path, 'r') as f: - lines = f.readlines() - for line in lines: - # Convert each text file line into an int list - lst = [int(i) for i in line.split()] - frame = lst[0] - - # Ball index = 0 - if lst[1] == 0: - # Take the center coordinates of the ball - ball_x = lst[3] + lst[5]/2 - ball_y = lst[4] - lst[6]/2 - - # Check to see if ball is in top box - in_top_x = (ball_x >= top[0] and ball_x <= top[0] + top[2]) - in_top_y = (ball_y <= top[1] and ball_y >= top[1] - top[3]) - if not passed_top: - if in_top_x and in_top_y and (frame not in - range(top_collision_start, - top_collision_end)): - top_collision_start = frame - passed_top = True - else: - if not in_top_x or not in_top_y: - top_collision_end = frame - - - # Check to see if ball is in rim box - in_rim_x = (ball_x >= rim[0] and ball_x <= rim[0] + rim[2]) - in_rim_y = (ball_y <= rim[1] and ball_y >= rim[1] - rim[3]) - if not passed_rim: - if in_rim_x and in_rim_y and (frame not in - range(rim_collision_start, - rim_collision_end)): - rim_collision_start = frame - passed_rim = True - else: - if not in_rim_x or not in_rim_y: - rim_collision_end = frame - - # If the ball has passed both the top and the rim box, then - # a shot has been made, and is added to the shots_made list - if passed_top and passed_rim: - shots_made.append((top_collision_start, rim_collision_end)) - - # Reset all the values - passed_top = False - passed_rim = False - top_collision_start = 0 - top_collision_end = 0 - rim_collision_start = 0 - rim_collision_end = 0 - - # return shots_made - # TODO this algorithm currently does not work for the current model outputs. - # TEMPORARY FIX: return the results ran on an older output - return [(131, 131), (132, 0), (629, 629), (630, 0), (1244, 0), (1561, 1561), (1562, 0)] - -if __name__ == '__main__': - print(new_rim('tmp/people.txt')) - print(madeshot('tmp/people.txt')) diff --git a/src/processing/team.py b/src/processing/team.py new file mode 100644 index 00000000..89c7c60b --- /dev/null +++ b/src/processing/team.py @@ -0,0 +1,53 @@ +from state import GameState +import numpy as np +from sklearn.cluster import SpectralClustering + +""" +Team Detection + +This module contains functions that will find the best team split. +""" + + +def passing_matrix(state: GameState, p_list: list): + "computes passing matrix of a state" + n = len(p_list) + graph = np.zeros((n, n)) + for i in range(n): + for j in range(n): + graph[i][j] = state.passes[p_list[i]][p_list[j]] + graph = graph + graph.T # directed -> undirected + return graph + + +def sparest_cut_weighted(graph): + "get sparest cut given pass frequency matrix" + # Perform spectral clustering + clustering = SpectralClustering( + n_clusters=2, affinity="precomputed", random_state=0, eigen_solver="arpack" + ) + labels = clustering.fit_predict(graph) + + # Calculate sparsity cut for weighted graph + cut_size = np.sum(graph[labels == 0][:, labels == 1]) + sparsity = cut_size / min(np.sum(labels == 0), np.sum(labels == 1)) + + return labels, sparsity + + +def split_team(state: GameState): + "splits players into two teams" + n = len(state.players) + p_list = list(state.players.keys()) + + graph = passing_matrix(state, p_list) + labels, _ = sparest_cut_weighted(graph) + assert len(labels) == n + + state.team1.clear() + state.team2.clear() + for i in range(n): + if labels[i] == 0: + state.team1.add(p_list[i]) + else: # label[i] == 1 + state.team2.add(p_list[i]) diff --git a/src/processing/team_detect.py b/src/processing/team_detect.py deleted file mode 100644 index 29b5fddc..00000000 --- a/src/processing/team_detect.py +++ /dev/null @@ -1,233 +0,0 @@ -from typing import Tuple -""" -Team Detection and Possession Finder - -This module contains functions that will find the best team split. It will -also create a list of players in the order of ball possession. -""" - - -def curr_possession(players, ball): - ''' - Input: - players: dictionary of players - ball: dictionary containing coords of the ball - Output: - possession_list [list]: list of player ids in the order of ball - possession throughout the video - ''' - player_ids = players.keys() - max_area = 0 - max_player = None - bxmin = ball.get("xmin") - bxmax = ball.get("xmax") - bymin = ball.get("ymin") - bymax = ball.get("ymax") - for player in player_ids: - pcoords = players.get(player) - curr_xmin = max(pcoords.get("xmin"), bxmin) - curr_xmax = min(pcoords.get("xmax"), bxmax) - curr_ymin = max(pcoords.get("ymin"), bymin) - curr_ymax = min(pcoords.get("ymax"), bymax) - if curr_xmin >= curr_xmax or curr_ymin >= curr_ymax: - continue - area = (curr_xmax - curr_xmin) * (curr_ymax - curr_ymin) - if area > max_area: - max_area = area - max_player = player - return max_player - - -def possession_list(frames, player_list, thresh=20): - ''' - Input: - state: a StatState class that holds all sorts of information - on the video - thresh: number of frames for ball overlapping with player - in order to count as possession - Output: - possession_list [list(tuples)]: list of tuples of the form - (playerid, startframe, endframe) - ''' - # list of frames where a frame is a dictionary with key classes and - # values xmin, ymin, xmax, ymax - # for key = players value has the format {players: - # {playerid1: {xmin: val, ymin: val, xmax: val, ymax: val}}} - states = frames - counter = 0 - current_player = None - pos_lst = [] - for frame in states: - if frame.get("ball") is None: - continue - players = frame.get("players") - poss = curr_possession(players, frame.get("ball")) - if poss is None: - continue - if poss == current_player: - counter += 1 - elif poss not in player_list: - continue - else: - if counter >= thresh: - pos_lst.append((current_player, int(frame.get( - "frameno"))-counter, frame.get("frameno"))) - current_player = poss - counter = 1 - # Can make this more robust by allowing for a certain number of - # frames where the ball is not in the possession of any player - # or of a different player - return pos_lst - -# def connections(pos_lst, players): -# """ -# Input: -# pos_lst [list]: list of player ids in the order of ball possession -# throughout the video -# players [list]: list of player ids -# Output: -# connects [dict]: dictionary of connections between players -# """ -# connects = {} -# for i, player in enumerate(players): -# for j, player2 in enumerate(players): -# if i == j: -# continue -# name = player + player2 -# connects.update({name: 0}) - -# curr = pos_lst[0] -# for i in range(1, len(pos_lst)): -# name = curr + pos_lst[i] -# connects[name] += 1 -# return connects - - -def connections(pos_lst, players, player_idx): - """ - Input: - pos_lst [list]: list of player ids in the order of ball possession - throughout the video - players [list]: list of player ids - player_idx [dict]: dictionary of player ids to their index in the - players list - Output: - connects [list of lists]: 2D array of connections between players where - connects[i][j] is the number of times - player i passes to player j - """ - connects = [[0 for _ in range(len(players))] for _ in range(len(players))] - for i in range(0, len(pos_lst)-1): - connects[player_idx.get(pos_lst[i][0]) - ][player_idx.get(pos_lst[i+1][0])] += 1 - return connects - - -def possible_teams(players): - """ - Input: - players [list]: list of player ids - Output: - acc [list]: list of possible team splits - """ - num_people = len(players) - - acc = [] - def permutation(i, t): - if i >= num_people: - return - if len(t) == ppl_per_team: - acc.append((t, (set(players) - set(t)))) - else: - permutation(i+1, t.copy()) - t.add(players[i]) - permutation(i+1, t.copy()) - - if num_people % 2 != 0: - ppl_per_team = int(num_people/2) + 1 - permutation(0, set()) - ppl_per_team -= 1 - permutation(0, set()) - else: - ppl_per_team = int(num_people/2) - permutation(0, set()) - return acc - - -def get_playerlist(frames, playerthresh=100): - ''' - Input: - frames: list of frames where a frame is a dictionary with key classes and - values xmin, ymin, xmax, ymax - for key = players value has the format {players: - {playerid1: {xmin: val, ymin: val, xmax: val, ymax: val}}} - Output: - player_list [list]: list of player ids - ''' - playerframecount = {} - player_list = [] - for frame in frames: - players = frame.get("players") - for player in players: - if player not in playerframecount: - playerframecount[player] = 1 - else: - playerframecount[player] += 1 - for player in playerframecount.keys(): - if playerframecount.get(player) >= playerthresh: - player_list.append(player) - return player_list - - -def team_split(frames): - ''' - Input: - state: a StatState class that holds all sorts of information - on the video - Output: - best_team [tuple]: tuple of two sets of player ids that are the best - team split - pos_lst [list[tuple]]: list of player ids in the order of ball - possession with start and finish frames - ''' - player_list = get_playerlist(frames, playerthresh=100) - pos_lst = possession_list(frames, player_list, thresh=11) - player_idx = {player: i for i, player in enumerate(player_list)} - connects = connections(pos_lst, player_list, player_idx) - teams = possible_teams(player_list) - best_team = None - min_count = 100000 - for team in teams: - count = 0 - team1 = list(team[0]) - team2 = list(team[1]) - for player1 in team1: - for player2 in team2: - count += (connects[player_idx.get(player1)] - [player_idx.get(player2)]) - count += (connects[player_idx.get(player2)] - [player_idx.get(player1)]) - if count < min_count: - min_count = count - best_team = team - return best_team, pos_lst, player_list - - -def compute_possession(player_pos, team1) -> Tuple[float, float]: - """ - Input: player possession, list of players on team 1 - Computes and returns team1 possession, team2 possession. - """ - # total frames of each team's possession - team1_pos = 0 - team2_pos = 0 - for player, pos in player_pos.items(): - for intervals in pos: - pos_time = intervals[1] - intervals[0] - if player in team1: - team1_pos += pos_time - else: - team2_pos += pos_time - total_pos = team1_pos + team2_pos - - return team1_pos / total_pos, team2_pos / total_pos diff --git a/src/processing/video_render.py b/src/processing/video_render.py deleted file mode 100644 index 6fc7ce1c..00000000 --- a/src/processing/video_render.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -Video Rendering module for courtline detection and video reencoding. -""" -import cv2 as cv -import random -import os -import numpy as np -import subprocess -import sys - -class VideoRender: - def __init__(self, homography): - self._TRUE_PATH = os.path.join('data','true_map.png') - self._TRUTH_COURT_MAP = cv.imread(self._TRUE_PATH,cv.IMREAD_GRAYSCALE) - self._HOMOGRAPHY = homography - - - def reencode(self, input_path, output_path): - """ - Re-encodes a MPEG4 video file to H.264 format. Overrides existing output videos if present. - Deletes the unprocessed video when complete. - Ensures ffmpeg dependency is installed - """ - - if sys.platform != 'darwin': - print("Designed to install dependency for macOS") - else: - try: - # check if it is installed already - subprocess.run(["brew", "list", "ffmpeg"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except: - # install dependency - print("Installing ffmpeg") - try: - subprocess.run(["brew", "install", "ffmpeg"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except: - print("Error installing ffmpeg") - return - - reencode_command = f'ffmpeg -y -i {input_path} -vcodec libx264 -c:a copy {output_path}' - os.system(reencode_command) - # os.remove(input_path) - - - def render_video(self,states:list,players:dict,filename:str,fps:int=30): - ''' - Takes into player position data, applied homography, - and renders video stored in filename - @param states, list of dictionaries, - each represent a frame with state info in chronological order - @param players, dictionary of players where keys are players - @param filename, file path from project root where video is saved - @param fps, frames per second expected of produced video - ''' - players=players.keys() - # Create a blank image to use as the background for each frame - background = cv.cvtColor(self._TRUTH_COURT_MAP,cv.COLOR_GRAY2BGR) - height, width, _ = background.shape - - # Initialize the video writer - fourcc = cv.VideoWriter_fourcc(*'mp4v') - video_writer = cv.VideoWriter(filename, fourcc, fps, (width,height)) - - # Define initial positions for each player - player_state = {} - for player in players: - player_state.update({player:{'pos':(0,0), - 'color':(random.randint(0,256),random.randint(0,256),random.randint(0,256))}}) - - # find duration of video - dur = states[-1]["frameno"] - states += [{"frameno":dur+fps,"players":{}}] - frame_index = 0 - # Loop through each time step - for t in range(1,dur+10): - # Create a copy of the background image to draw the points on - frame = background.copy() - - # Get dictionary of positions at each frame - while (states[frame_index]["frameno"]<=t): - state = states[frame_index] - player_info = state['players'] - for player in players: - if player in player_info: - pd = player_info[player] - ps = player_state[player] - x, y = (pd['xmin']+pd['xmax'])/2.0, pd['ymax']-5 - x1, y1 = self._transform_point(x,y) - x0, y0 = ps['pos'] - x1, y1 = (2*x1+x0)/3.0, (2*y1+y0)/3.0 - ps.update({'pos':(x1, y1)}) - if frame_index>=len(states)-2>= 0 or states[frame_index+1]["frameno"] > t: # release if at end of contig - break - frame_index += 1 - - - # Loop through each point and draw it on the frame - for player in players: - pos = player_state[player]['pos'] - pos = (int(pos[0]),int(pos[1])) - color = player_state[player]['color'] - font = cv.FONT_HERSHEY_SIMPLEX - thickness = 2 - font_scale = 1 - radius = 10 - text_width = cv.getTextSize(player, font, font_scale, thickness)[0][0] - cv.circle(img=frame, center=pos, radius=radius, color=color, thickness=-1) - cv.putText(img=frame,text=player,org=(pos[0]-(text_width//2),pos[1]-radius-10), - fontFace=font,fontScale=font_scale,color=color,thickness=thickness,lineType=cv.LINE_AA) - - # Write the frame to the video writer - video_writer.write(frame) - - # Release the video writer - video_writer.release() - - - def _transform_point(self,x:float,y:float): - ''' - Applies court homography to single point - @param x,y pixel positions of point on court video - @returns transformed pixels x,y positions on true court - ''' - point = np.array([x, y], dtype=np.float32) - point = point.reshape((1, 1, 2)) - transformed_point = cv.perspectiveTransform(point, self._HOMOGRAPHY) - tx, ty = transformed_point[0, 0] - return tx, ty diff --git a/src/processrunner.py b/src/processrunner.py index 712d3096..b458ecca 100644 --- a/src/processrunner.py +++ b/src/processrunner.py @@ -1,80 +1,66 @@ """ Runner module for processing and statistics """ +import state from state import GameState -from processing import general_detect, team_detect, shot_detect, courtline_detect, video_render +from processing import parse, court, render, shot, team + + class ProcessRunner: """ Runner class taking in: original video file path, 2 model output files, render destination path Performs player, team, shot, and courtline detection in sequence. Effect: updates GameState with statistics and produces courtline video. """ - def __init__(self, video_path, players_tracking, ball_tracking, output_video_path, - output_video_path_reenc) -> None: + + def __init__( + self, + video_path, + players_tracking, + ball_tracking, + output_video_path, + output_video_path_reenc, + ): self.video_path = video_path self.players_tracking = players_tracking self.ball_tracking = ball_tracking self.output_video_path = output_video_path self.output_video_path_reenc = output_video_path_reenc - self.state = GameState() - + self.state: GameState = GameState() - def run_general_detect(self): - """Runs various detection modules that updates GameState's rim states, ball""" - rim_info, frames = general_detect.parse_output(self.state, self.players_tracking) - self.state.rim, self.state.states = rim_info, frames - general_detect.parse_ball(self.state, self.ball_tracking) + def run_parse(self): + "Runs parse module over SORT (and pose later) outputs to update GameState" + parse.parse_sort_output(self.state, self.players_tracking) + parse.parse_sort_output(self.state, self.ball_tracking) + def run_possession(self): + self.state.filter_players(threshold=100) + self.state.recompute_possession_list(threshold=20, join_threshold=20) + self.state.recompute_pass_from_possession() def run_team_detect(self): - """ - TODO figure out how to decouple team and general processing more - TODO explain pos_list - Splits identified players into teams, then curates: - ball state, passes, player possession, and team possession - """ - teams, pos_list, playerids = team_detect.team_split(self.state.states) - self.state.possession_list = pos_list - for pid in playerids: - self.state.players[pid] = {'shots': 0, "points": 0, "rebounds": 0, "assists": 0} - self.state.ball_state = general_detect.ball_state_update(pos_list, - len(self.state.states) - 1) - self.state.passes = general_detect.player_passes(pos_list) - self.state.possession = general_detect.player_possession(pos_list) - - self.state.team1 = teams[0] - self.state.team2 = teams[1] - - self.state.team1_pos, self.state.team2_pos = team_detect.compute_possession( - self.state.possession, self.state.team1) - + team.split_team(self.state) def run_shot_detect(self): - """Runs shot detection and updates scores.""" - #TODO figure out madeshot and resolve conflict in state & takuma module - made_shots = shot_detect.madeshot(self.players_tracking) - self.state.update_scores(made_shots) - + shot.shots(self.state, window=5) def run_courtline_detect(self): """Runs courtline detection.""" - court = courtline_detect.Render(self.video_path) - self.homography = court.get_homography() - + c = court.Render(self.video_path) + self.homography = c.get_homography() def run_video_render(self): """Runs video rendering and reencodes, stores to output_video_path_reenc.""" - videoRender = video_render.VideoRender(self.homography) - videoRender.render_video(self.state.states, self.state.players, self.output_video_path) - videoRender.reencode(self.output_video_path, - self.output_video_path_reenc) - + videoRender = render.VideoRender(self.homography) + videoRender.render_video(self.state, self.output_video_path) + videoRender.reencode(self.output_video_path, self.output_video_path_reenc) def run(self): """ Runs all processing and statistics. """ - self.run_general_detect() + self.run_parse() + self.run_possession() self.run_team_detect() self.run_shot_detect() print('G, T, S detect fine') @@ -83,9 +69,8 @@ def run(self): self.run_video_render() print('video render fine') - def get_results(self): """ Returns string of processed statistics. """ - return repr(self.state) + return str(state.todict(self.state)) diff --git a/src/state.py b/src/state.py index 7780e280..a642e384 100644 --- a/src/state.py +++ b/src/state.py @@ -2,60 +2,504 @@ Module containing state of game statistics """ from enum import Enum +import sys -class BallState(Enum): + +# maintains dictionary functionality, if desired: +def todict(obj): + "to dictionary, recursively" + if isinstance(obj, dict): + result = {} + for key, value in obj.items(): + result[key] = todict(value) # Recursive call for dictionary values + return result + elif hasattr(obj, "__dict__"): + return todict(obj.__dict__) # Recursive call for objects with __dict__ + elif isinstance(obj, list): + return [todict(item) for item in obj] # Recursive call for list items + else: + return obj # Base case: return the original value + + +# of importance for SORT object type enumerating +class ObjectType(Enum): + "type of object detected in object tracking" + BALL = 0 + PLAYER = 1 + RIM = 2 + + +class Box: + """ + Bounding box containing + xmin, ymin, xmax, ymax of bounding box + """ + + def __init__(self, xmin: int, ymin: int, xmax: int, ymax: int) -> None: + """ + Initializes bounding box + """ + # IMMUTABLE + self.xmin: int = xmin + self.ymin: int = ymin + self.xmax: int = xmax + self.ymax: int = ymax + + def area(self) -> int: + "area of bounding box" + if self.check(): + return (self.xmax - self.xmin) * (self.ymax - self.ymin) + else: + return 0 + + def point(self, xrel: float, yrel: float) -> tuple: + "(x,y) point relative to scaling. Requires Well-Defined Box" + if self.check(): + x = int(xrel * (self.xmax - self.xmin)) + y = int(yrel * (self.ymax - self.ymin)) + return (x, y) + else: + raise Exception("box not well-defined") + + def inbounds(self, x: int, y: int) -> bool: + "if (x,y) within bounding box" + return x >= self.xmin and x <= self.xmax and y >= self.ymin and y <= self.ymax + + def area_of_intersection(self, box): + inter: Box = Box( + xmin=max(box.xmin, self.xmin), + ymin=max(box.ymin, self.ymin), + xmax=min(box.xmax, self.xmax), + ymax=min(box.ymax, self.ymax), + ) + return inter.area() + + def contains(self, box) -> bool: + return self.area_of_intersection(box) == box.area() + + def intersects(self, box) -> bool: + return self.area_of_intersection(box) != 0 + + def check(self) -> bool: + "verifies if well-defined" + try: + assert self.xmin <= self.xmax and self.ymin <= self.ymax + assert ( + self.xmin is not None + and self.ymin is not None + and self.xmax is not None + and self.ymax is not None + ) + except: + return False + return True + + +class ShotType(Enum): + "Status of shot, paired with point value" + MISS = 0 + TWO = 2 + THREE = 3 + + +class ShotAttempt: + "Shot Attempt object" + + def __init__(self, playerid: str, start: int, end: int) -> None: + """ + A single shot attempt containing + playerid: shot's player + start: first frame + end: last frame + made: whether it was made + frame: frameno if it was made + type: MISS, TWO, or THREE + """ + # IMMUTABLE + self.playerid: str = playerid + "player i of shot attempt" + self.start: int = start + "first frame" + self.end: int = end + "last frame" + + # MUTABLE + self.made: bool = False + self.frame: int = None + "frame shot was made, if applicable" + self.type: ShotType = ShotType.MISS + "MISSED, TWO, or THREE" + + def value(self) -> int: + "point value of shot attempt" + return self.type.value + + def check(self) -> bool: + "verifies if well-defined" + try: + assert self.start <= self.end + assert ( + self.start is not None + and self.end is not None + and self.type is not None + ) + except: + return False + return True + + +class BallType(Enum): """ Indicates the status of the ball at any given frame. """ + IN_POSSESSION = 1 # hold/dribble IN_TRANSITION = 2 # pass/shot OUT_OF_PLAY = 3 # out of bounds, just after shot, etc. +class BallFrame: + """ + Ball state containing + box: bounding box + playerid: of last posession + type: IN_POCESSION, IN_TRANSITION, or OUT_OF_PLAY + """ + + def __init__(self, xmin: int, ymin: int, xmax: int, ymax: int) -> None: + # IMMUTABLE + self.box: Box = Box(xmin, ymin, xmax, ymax) + "bounding box" + + # MUTABLE + self.playerid: int = None + "last player in possession" + self.type: BallType = None + "IN_POCESSION, IN_TRANSITION, or OUT_OF_PLAY" + + def check(self) -> bool: + "verifies if well-defined" + try: + assert self.box.check() is True + assert self.playerid is not None and self.type is not None + except: + return False + return True + + +class ActionType(Enum): + "player actions type" + NOTHING = 0 # assumption: NOTHING is only action not with ball + DRIBBLE = 1 + PASS = 2 + SHOOT = 3 + + +class PlayerFrame: + """ + Player state containing + box: bounding box + playerid: of last posession + type: NOTHING, DRIBBLE, PASS, SHOOT + """ + + def __init__(self, xmin: int, ymin: int, xmax: int, ymax: int) -> None: + # IMMUTABLE + self.box: Box = Box(xmin, ymin, xmax, ymax) + "bounding box" + + # MUTABLE + self.ballid: int = -1 + "ball in possession (-1) if not in possession" + self.type: ActionType = None + "NOTHING, DRIBBLE, PASS, SHOOT" + + def check(self) -> bool: + "verifies if well-defined" + try: + assert self.box.check() is True + assert self.type is not None + if self.type is ActionType.NOTHING: + assert self.ballid == -1 + else: + assert self.ballid != -1 + except: + return False + return True + + +class Frame: + "Frame class containing frame-by-frame information" + + def __init__(self, frameno: int) -> None: + """ " + Instantiates Frame containing fields + frameno: frame number + players: dictionary of players info during frame + balls: dictionary of balls info during frame + rim: bounding box of rim + possession: player in possession of ball + """ + # IMMUTABLE + self.frameno: int = frameno + "frame number, Required: non-negative integer" + + # MUTABLE + self.players: dict[str, PlayerFrame] = {} # ASSUMPTION: MULITPLE PEOPLE + "dictionary of form {player_[id] : PlayerFrame}" + self.ball: BallFrame = None # ASSUMPTION: SINGLE BALLS + "dictionary of form {ball_[id] : BallFrame}" + self.rim: Box = None # ASSUMPTION: SINGLE RIM + "bounding box of rim" + self.possesions: list[str] = [] + "player in possession of ball" + + def add_player_frame(self, id: int, xmin: int, ymin: int, xmax: int, ymax: int): + "update players in frame given id and bounding boxes" + pf = PlayerFrame(xmin, ymin, xmax, ymax) + id = "player_" + str(id) + self.players.update({id: pf}) + + def set_ball_frame(self, id: int, xmin: int, ymin: int, xmax: int, ymax: int): + "set ball in frame given id and bounding boxes" + bf = BallFrame(xmin, ymin, xmax, ymax) + id = "ball_" + str(id) + self.ball = bf + + def set_rim_box(self, id: int, xmin: int, ymin: int, xmax: int, ymax: int): + "set rim box given bounding boxes" + r = Box(xmin, ymin, xmax, ymax) + self.rim = r + + def calculate_possesions(self): + "calculate player in possession of ball, set to possession, None is none" + bbox: Box = self.ball.box + max_p: set = set() + max_a = 1 + for p, pf in enumerate(self.players): + a = bbox.area_of_intersection(pf.box) + if a == max_a: + max_p.add(p) + elif a > max_a: + max_a = a + max_p = set() + max_p.add(p) + self.possesions = max_p + + def check(self) -> bool: + "verifies if well-defined" + try: + assert ( + self.frameno is not None + and self.players is not None + and self.ball is not None + and self.rim is not None + ) + for pf in self.players.values(): + assert pf.check() + assert self.ball.check() + assert self.rim.check() + except: + return False + return True + + +class PlayerState: + "State object containing player information throughout whole game" + + def __init__(self) -> None: + """ + Player state containing + frames: number frames player appeared in + + """ + # MUTABLE + self.frames: int = 0 + + +class BallState: + "State object containing ball information throughout whole game" + + def __init__(self) -> None: + """ + Ball state containing ball stuff + + """ + # MUTABLE + self.frames: int = 0 + + +class Interval: + "Object of interval when certain player contains a ball" + + def __init__(self, playerid, start, end) -> None: + """ + Interval obj containing + playerid: id of player in possession + start: start frame (inclusive) + end: end frame (inclusive) + frames: iterable range over interval + """ + # IMMUTABLE + self.playerid: str = playerid + self.start: int = start + self.end: int = end + self.length: int = end - start + self.frames = range(start, end) # when want to iterate through frames + + def check(self) -> bool: + "verifies if well-defined" + try: + assert ( + self.playerid is not None + and self.start is not None + and self.frames is not None + and self.frames is not None + ) + assert self.start <= self.end + except: + return False + return True + + class GameState: """ State class holding: player positions, ball position, and team scores """ + def __init__(self) -> None: """ Initialises state; contains the following instance variables: - rim: rim position - backboard: backboard position TODO - states: list of dictionaries with info at each frame - possession_list: list of ball possession tuples - passes: dictionary of passes with their start and end frames and players involved - possession: dictionary of players with their possessions as list of frame tuples - team1, team2: list of players on each team - score1, score2: score of each team - team1_pos, team2_pos: percentage of possession for each team + frames: list of PlayerFrame + players: dictionary of PlayerState + ball: BallState + possessions: list of PossessionInterval + passes: dictionary of passes + shots: list of shots by player + team1: set of players on one team + team2: est of players on other team """ - # IMMUTABLE - self.rim = None - self.backboard = None - # MUTABLE - # [{'frameno': #, 'ball': {xmin, xmax, ymin, ymax}, 'playerid'...}] - self.states = None - # [(start_frame, end_frame, TODO something)] - self.possession_list = None - # {'playerid': {'shots': 0, "points": 0, "rebounds": 0, "assists": 0}} - self.players = {} - # [(start_frame, end_frame, BallState)] - self.ball_state = None - # {'pass_id': {'frames': (start_frame, end_frame)}, 'players':(p1_id, p2_id)}} - self.passes = None - # {'player_id': [(start_frame, end_frame), ...]} - self.possession = None - self.team1 = None - self.team2 = None - - # statistics - self.score1 = 0 - self.score2 = 0 - self.team1_pos = 0 - self.team2_pos = 0 + self.frames: list[Frame] = [] + "list of frames: [Frame], each frame has player, ball, and rim info" + + self.players: dict[str, PlayerState] = {} + "Global player data: {player_0 : PlayerState, player_1 : PlayerState}" + + self.ball: BallState = BallState() + "Global ball data" + + self.possessions: list[Interval] = [] + "[PossessionInterval]" + + self.passes: dict[str, dict[int]] = {} + "dictionary of passes {player_0 : {player_0 : 3}}" + + self.shots: list[Interval] = [] + " list of shots: [(player_[id],start,end)]" + + self.team1: set = set() + self.team2: set = set() + + def recompute_frame_count(self): + "recompute frame count of all players in frames" + for ps in self.players.values(): # reset to 0 + ps.frames = 0 + for frame in self.frames: + for pid in frame.players: + if pid not in self.players: + self.players.update({pid: PlayerState()}) + self.players.get(pid).frames += 1 + + def recompute_possession_list(self, threshold=20, join_threshold=20): + """ + Recompute posssession list with frame possession minimum [threshold]. + Requires at least one frame + """ + lst = [] + prev = None + while lst != prev: # until lists have converged + prev = lst.copy() + self.grow_poss(lst) + self.join_poss(lst, threshold) + self.filter_poss(lst, join_threshold) + self.possessions = lst + + def grow_poss(self, lst: list) -> None: + """ + modifies posssession list [lst] with more possession, if avaiable + + """ + i = 0 + fi = 0 + while fi < len(self.frames): + f: Frame = self.frames[fi] + frame = f.frameno + poss = f.possesions + + if i >= len(lst): + s = sys.maxsize # ensures new poss frame added + else: + pi: Interval = lst[i] # assume well-defined + s = pi.start + + if len(poss) == 0 or s <= frame: # skip when nothing + fi += 1 # next frame + continue + else: # frame < start + p = poss.pop() # arbitrary player + lst.insert(i, (p, frame, frame)) + i += 1 # next interval + + def join_poss(self, lst: list, threshold: int = 20): # possiblility of mapreduce + "modifies posssession list to join same player intervals within threshold frames" + i = 0 + while i < len(lst) - 1: + p1: Interval = lst[i] + p2: Interval = lst[i + 1] + + if p1.playerid is p2.playerid and p2.start - p1.end <= threshold: + lst.pop(i) + lst.pop(i) + p = Interval(p1.playerid, p1.start, p2.end) + lst.insert(i, p) + else: + i += 1 # next interval pair + + def filter_poss(self, lst: list, threshold: int = 20): + "modifies posssession list to join same player intervals within threshold frames" + i = 0 + while i < len(lst): + p: Interval = lst[i] + if p.length < threshold or p.playerid not in self.players: + lst.pop(i) + else: + i += 1 # next interval + + def filter_players(self, threshold: int): + "removes all players which appear for less than [threshold] frames" + self.recompute_frame_count() + for k in list(self.players.keys()): + v: PlayerState = self.players.get(k) + if v.frames < threshold: + self.players.pop(k) + + def recompute_pass_from_possession(self): + "Recompute passes naively from possession list" + self.passes: dict = {} # reset pass dictionary + for p in self.players: + self.passes.update({p: {}}) + for c in self.players: + self.passes.get(p).update({c: 0}) + + i = 0 + for i in range(len(self.possessions) - 1): + p1 = self.possessions[i].playerid + p2 = self.possessions[i + 1].playerid + self.passes[p1][p2] += 1 + ## TODO Update for backend def update_scores(self, madeshot_list): """ TODO check for correctness + potentially move out of state.py @@ -69,47 +513,48 @@ def update_scores(self, madeshot_list): Updates self.score1, self.score2. """ madeshots = [] - madeshot_lst = [] + mdsh_lst = [] # Set counter to first made shot (where madeshot_list[counter][1] != 0) counter = 0 for shot in madeshot_list: if shot[1] != 0: - madeshot_lst.append(shot) + mdsh_lst.append(shot) # Iterate through possession list and find who made the shot # TODO what if madeshot_lst is empty? for pos in self.possession_list: - if pos[2] >= madeshot_lst[counter][0]: - madeshots.append((pos[0], madeshot_lst[counter][0])) + if pos[2] >= mdsh_lst[counter][0]: + madeshots.append((pos[0], mdsh_lst[counter][0])) counter += 1 - if counter >= len(madeshot_lst): + if counter >= len(mdsh_lst): break # For each shot made update the player's and team's score for shot in madeshots: - self.players[shot[0]]['shots'] += 1 - self.players[shot[0]]['points'] += 2 + self.players[shot[0]]["shots"] += 1 + self.players[shot[0]]["points"] += 2 if shot[0] in self.team1: self.score1 += 2 else: self.score2 += 2 + # def __repr__(self) -> str: + # result_dict = { + # "Rim coordinates": str(self.rim) if len(self.rim) > 0 else "None", + # "Backboard coordinates": str(self.backboard) + # if len(self.rim) > 0 + # else "None", + # "Court lines coordinates": "None", + # "Number of frames": str(len(self.frames)), + # "Number of players": str(len(self.players)), + # "Number of passes": str(len(self.passes)), + # "Team 1": str(self.team1), + # "Team 2": str(self.team2), + # "Team 1 Score": str(self.score1), + # "Team 2 Score": str(self.score2), + # "Team 1 Possession": str(self.team1_pos), + # "Team 2 Possession": str(self.team2_pos), + # } + # for player in self.players: + # result_dict[player] = str(self.players[player]) - def __repr__(self) -> str: - result_dict = { - "Rim coordinates": str(self.rim) if len(self.rim) > 0 else "None", - "Backboard coordinates": str(self.backboard) if len(self.rim) > 0 else "None", - "Court lines coordinates": "None", - "Number of frames": str(len(self.states)), - "Number of players": str(len(self.players)), - "Number of passes": str(len(self.passes)), - "Team 1": str(self.team1), - "Team 2": str(self.team2), - "Team 1 Score": str(self.score1), - "Team 2 Score": str(self.score2), - "Team 1 Possession": str(self.team1_pos), - "Team 2 Possession": str(self.team2_pos) - } - for player in self.players: - result_dict[player] = str(self.players[player]) - - return str(result_dict) + # return str(result_dict) diff --git a/src/view/app.py b/src/view/app.py index 03d683da..e948244b 100644 --- a/src/view/app.py +++ b/src/view/app.py @@ -6,20 +6,21 @@ import hydralit_components as hc import pandas as pd import requests -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from main import main -st.set_page_config(page_title='HoopTracker', page_icon=':basketball:') +st.set_page_config(page_title="HoopTracker", page_icon=":basketball:") # 'set up tab tile and favicon' # Initialize Session States: # 0 = Default State : No Video Uploaded --> Prompts For Upload / Home Screen Demo # 1 = Uploading/Processing State --> Loading Screen # 2 = Done Processing, Show Statistics, Allow Exporting -if 'state' not in st.session_state: +if "state" not in st.session_state: st.session_state.state = 0 - st.session_state.logo = 'src/view/static/basketball.png' - with open('data/training_data.mp4', 'rb') as file: + st.session_state.logo = "src/view/static/basketball.png" + with open("data/training_data.mp4", "rb") as file: st.session_state.video_file = io.BytesIO(file.read()) st.session_state.processed_video = None st.session_state.result_string = None @@ -36,54 +37,61 @@ def process_video(video_file): ''' if video_file is None: return False - response = requests.post(SERVER_URL+"upload", - files={"video_file": video_file}, timeout=30) + response = requests.post( + SERVER_URL + "upload", files={"video_file": video_file}, timeout=30 + ) if response.status_code == 200: data = response.json() - st.session_state.upload_name = data.get('message') + st.session_state.upload_name = data.get("message") # temp fix - with open('tmp/user_upload.mp4', 'wb') as f: - #f.write(video_file.value) + with open("tmp/user_upload.mp4", "wb") as f: + # f.write(video_file.value) f.write(video_file.getvalue()) else: - print('error uploading file') # maybe make an error handler in frontend + print("error uploading file") # maybe make an error handler in frontend st.session_state.is_downloaded = False return True # Pages def main_page(): - ''' + """ Loads main page - ''' - st.markdown(''' + """ + st.markdown( + """ # HoopTracker A basketball analytics tracker built on YOLOv5 and OpenCV. Simply upload a video in the side bar and click "Process Video." - ''') + """ + ) # send to tips page st.button(label="Having Trouble?", on_click=change_state, args=(-1,)) # Basketball Icon Filler - _, col2, _ = st.columns([0.5,5,0.5]) + _, col2, _ = st.columns([0.5, 5, 0.5]) with col2: - st.image(image=st.session_state.logo,use_column_width=True) + st.image(image=st.session_state.logo, use_column_width=True) def loading_page(): - ''' + """ Loads loading page - ''' - st.markdown(''' + """ + st.markdown( + """ # Processing... Please wait while we upload and process your video. - ''') + """ + ) # Loading bar until video processes - with hc.HyLoader('', hc.Loaders.pulse_bars,): + with hc.HyLoader( + "", + hc.Loaders.pulse_bars, + ): process_video(video_file=st.session_state.video_file) - fetch_result_video() - fetch_result_string() + fetch_result() # Load results page when done change_state(2) @@ -91,183 +99,210 @@ def loading_page(): def results_page(): - st.markdown(''' + st.markdown( + """ # Results These are the results. Here's the processed video and a minimap of the player positions. - ''') - process_data = fetch_result_string() - st.video(open(fetch_result_video(), 'rb').read()) + """ + ) + st.video(open(st.session_state.processed_video, "rb").read()) - st.markdown('## Statistics') + st.markdown("## Statistics") process_results() st.download_button( - label='Download Results', + label="Download Results", use_container_width=True, - data=process_data, - file_name="results.txt" + data=st.session_state.result_string, + file_name="results.txt", ) - st.button(label='Back to Home', on_click=change_state, args=(0,),type="primary") + st.button(label="Back to Home", on_click=change_state, args=(0,), type="primary") + def tips_page(): - ''' + """ Loads tips page - ''' - st.markdown(''' + """ + st.markdown( + """ # Tips and Tricks Is our model having trouble processing your video? Here's some tips and tricks we have for you. * Position the camera on a stable surface, such as a tripod. * Ensure the players and hoop and the court are visible as much as possible. * We recommend at least 1080p video shot at 30 fps. - ''') + """ + ) # Back to Home - st.button(label='Back to Home', on_click=change_state, args=(0,)) + st.button(label="Back to Home", on_click=change_state, args=(0,)) def error_page(): - ''' + """ Loads error page - ''' - st.markdown(''' + """ + st.markdown( + """ # Error: Webpage Not Found Try reloading the page to fix the error. If there are any additional issues, please report errors to us [here](https://github.com/CornellDataScience/Ball-101). - ''') - st.button(label='Back to Home', on_click=change_state, args=(0,)) + """ + ) + st.button(label="Back to Home", on_click=change_state, args=(0,)) def setup_sidebar(): - '''Sets up sidebar for uploading videos''' + """Sets up sidebar for uploading videos""" # Display upload file widget - st.sidebar.markdown('# Upload') + st.sidebar.markdown("# Upload") file_uploader = st.sidebar.file_uploader( - label='Upload a video', - type=['mp4', 'mov', 'wmv', 'avi', 'flv', 'mkv'], - label_visibility='collapsed') + label="Upload a video", + type=["mp4", "mov", "wmv", "avi", "flv", "mkv"], + label_visibility="collapsed", + ) if file_uploader is not None: update_video(file_uploader) # Display video they uploaded - st.sidebar.markdown('# Your video') + st.sidebar.markdown("# Your video") st.sidebar.video(data=st.session_state.video_file) # Process options to move to next state - col1, col2 = st.sidebar.columns([1,17]) - consent_check = col1.checkbox(label=" ", label_visibility='hidden') - col2.caption(''' + col1, col2 = st.sidebar.columns([1, 17]) + consent_check = col1.checkbox(label=" ", label_visibility="hidden") + col2.caption( + """ I have read and agree to HoopTracker's [terms of services.](https://github.com/CornellDataScience/Ball-101) - ''') + """ + ) - st.sidebar.button(label='Upload & Process Video', - disabled= not consent_check, - use_container_width=True, - on_click=change_state, args=(1,), - type='primary') + st.sidebar.button( + label="Upload & Process Video", + disabled=not consent_check, + use_container_width=True, + on_click=change_state, + args=(1,), + type="primary", + ) # Helpers -def change_state(state:int): - ''' +def change_state(state: int): + """ Call back function to change page @param state, integer to change the state - ''' + """ st.session_state.state = state def update_video(video_file): - ''' + """ Updates video on screen @param video_file, file path to video - ''' + """ st.session_state.video_file = video_file -def fetch_result_video(): - ''' - Updates and returns the resulting video to be displayed. - TODO change to calling backend instead of accessing from repo - ''' - if st.session_state.processed_video is None: - st.session_state.processed_video = 'tmp/court_video_reenc.mp4' - return st.session_state.processed_video - - -def fetch_result_string(): - ''' +def fetch_result(): + """ Updates and returns the resulting statistics in string format. TODO change to calling backend instead of accessing from repo - ''' + """ # if st.session_state.result_string is None: # response = requests.get(SERVER_URL+f"download/{st.session_state.upload_name}", files= - # {'file_name': st.session_state.upload_name, 'download_path': + # {'file_name': st.session_state.upload_name, 'download_path': # 'tmp/user_upload.mp4'}, timeout=30) # if response.status_code == 200: # st.session_state.result_string = main('tmp/user_upload.mp4') # else: # print('error downloading file') # maybe make an error handler in frontend # st.session_state.result_string = main('data/training_data.mp4') - - st.session_state.result_string = main('tmp/user_upload.mp4') - return st.session_state.result_string + + st.session_state.result_string = main("tmp/user_upload.mp4") + st.session_state.processed_video = "tmp/court_video_reenc.mp4" def process_results(): - ''' + """ Processes st.session_state.result_string to display results - ''' - # Parse the result string into a dictionary - result_dict = ast.literal_eval(st.session_state.result_string) - - # Create dataframes for each category - general_df = pd.DataFrame({ - 'Number of frames': [result_dict['Number of frames']], - 'Number of players': [result_dict['Number of players']], - 'Number of passes': [result_dict['Number of passes']] - }) - - team_df = pd.DataFrame({ - 'Team 1': [result_dict['Team 1'], result_dict['Team 1 Score'], - result_dict['Team 1 Possession']], - 'Team 2': [result_dict['Team 2'], result_dict['Team 2 Score'], - result_dict['Team 2 Possession']] - }, index=['Players', 'Score', 'Possession']) - - coordinates_df = pd.DataFrame({ - 'Rim coordinates': [result_dict['Rim coordinates']], - 'Backboard coordinates': [result_dict['Backboard coordinates']], - 'Court lines coordinates': [result_dict['Court lines coordinates']], - }) - - # Create a dictionary for players data - players_dict = {key: value for key, value in result_dict.items() - if key not in general_df.columns and key not in team_df.columns and - key not in coordinates_df.columns} - # Remove team score and possession details from players dictionary - team_keys = ['Team 1 Score', 'Team 2 Score', 'Team 1 Possession', 'Team 2 Possession'] - players_dict = {key: value for key, value in players_dict.items() if key not in team_keys} - - # Convert player statistics from string to dictionary - for player, stats in players_dict.items(): - players_dict[player] = ast.literal_eval(stats) - - players_df = pd.DataFrame(players_dict).T - - # Display dataframes - st.write("### General") - st.dataframe(general_df) - - st.write("### Team") - st.dataframe(team_df) - - st.write("### Players") - st.dataframe(players_df) - - st.write("### Coordinates") - st.dataframe(coordinates_df) + """ + # TODO revamp results processing + # # Parse the result string into a dictionary + # result_dict = ast.literal_eval(st.session_state.result_string) + + # # Create dataframes for each category + # general_df = pd.DataFrame( + # { + # "Number of frames": [result_dict["Number of frames"]], + # "Number of players": [result_dict["Number of players"]], + # "Number of passes": [result_dict["Number of passes"]], + # } + # ) + + # team_df = pd.DataFrame( + # { + # "Team 1": [ + # result_dict["Team 1"], + # result_dict["Team 1 Score"], + # result_dict["Team 1 Possession"], + # ], + # "Team 2": [ + # result_dict["Team 2"], + # result_dict["Team 2 Score"], + # result_dict["Team 2 Possession"], + # ], + # }, + # index=["Players", "Score", "Possession"], + # ) + + # coordinates_df = pd.DataFrame( + # { + # "Rim coordinates": [result_dict["Rim coordinates"]], + # "Backboard coordinates": [result_dict["Backboard coordinates"]], + # "Court lines coordinates": [result_dict["Court lines coordinates"]], + # } + # ) + + # # Create a dictionary for players data + # players_dict = { + # key: value + # for key, value in result_dict.items() + # if key not in general_df.columns + # and key not in team_df.columns + # and key not in coordinates_df.columns + # } + # # Remove team score and possession details from players dictionary + # team_keys = [ + # "Team 1 Score", + # "Team 2 Score", + # "Team 1 Possession", + # "Team 2 Possession", + # ] + # players_dict = { + # key: value for key, value in players_dict.items() if key not in team_keys + # } + + # # Convert player statistics from string to dictionary + # for player, stats in players_dict.items(): + # players_dict[player] = ast.literal_eval(stats) + + # players_df = pd.DataFrame(players_dict).T + + # # Display dataframes + # st.write("### General") + # st.dataframe(general_df) + + # st.write("### Team") + # st.dataframe(team_df) + + # st.write("### Players") + # st.dataframe(players_df) + + # st.write("### Coordinates") + # st.dataframe(coordinates_df) # Entry Point diff --git a/tmp/court_video.mp4 b/tmp/court_video.mp4 deleted file mode 100644 index 33a4bc47..00000000 Binary files a/tmp/court_video.mp4 and /dev/null differ