-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
295 lines (263 loc) · 14.1 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import tkinter as tk
import threading
import cv2
import numpy as np
import pyautogui
import time
import torch
from hubconf import custom
import pandas as pd
from deep_sort_realtime.deepsort_tracker import DeepSort
import random
def extract_bounding_boxes(predictions):
bboxes, confidences, classNames = [], [], []
for item in predictions:
df = pd.DataFrame(item)
df.columns = ['xmin', 'ymin', 'xmax', 'ymax', 'confidence', 'class', 'name']
for index, detection in df.iterrows():
xmin, ymin, xmax, ymax = map(int, detection[['xmin', 'ymin', 'xmax', 'ymax']])
bbox = [xmin, ymin, xmax, ymax]
bboxes.append(bbox)
confidences.append(detection['confidence'])
classNames.append(detection['name'])
return bboxes, confidences, classNames
def sort_tracks_by_distance(coordsOfCenterPoint, tracks):
sorted_tracks_dict = {}
center_point = np.array([coordsOfCenterPoint[0], coordsOfCenterPoint[1]])
for track in tracks:
if not track.is_confirmed():
continue
# ltrb stands for "left, top, right, bottom", which are the coordinates of a bounding box
# surrounding an object in an image.
# track.to_ltrb() is a method of the track object in the DeepSort tracker,
# which returns the current bounding box of the object represented by the track in the form of a
# tuple containing the (left, top, right, bottom) coordinates.
# This method is used to retrieve the current position of the tracked object in the image.
ltrb = track.to_ltrb()
# calculate the center point for each bounding box
bbox_center = np.array([(ltrb[0] + ltrb[2]) / 2, (ltrb[1] + ltrb[3]) / 2])
distance = np.linalg.norm(center_point - bbox_center)
sorted_tracks_dict[track.track_id] = {"ltrb": ltrb, "bbox_center": bbox_center, "distance": distance}
# Sort the dictionary by distance and return a list of its items
sorted_items = sorted(sorted_tracks_dict.items(), key=lambda x: x[1]["distance"])
return sorted_items
class WindowCapture:
def __init__(self):
self.recording = False
self.clicked_hotspots = [] # so we can avoid clicking the same hotspot over and over again
self.tracker = DeepSort(max_age=4, max_iou_distance=1, n_init=3,
nn_budget=10, )
# information on the capture window
self.top_left_x_window_capture = None
self.top_left_y_window_capture = None
self.width_window_capture = None
self.height_window_capture = None
self.frame_center = None
# threads
self.recording_thread = None
# time stamps for when xp is detected
self.xp_detected_log = []
def move_mouse_and_click(self, data, window_position):
# Sometimes when traversing the rooftop after using an agility-hotspot the old
# agility-hotspot might remain the closest object. Which clicking the same
# spot over and over again is not desirable
for obj_id, obj_data in data:
if obj_id in self.clicked_hotspots:
# Skip over objects that have already been clicked
print(f"{obj_id} already clicked")
continue
print(f"{obj_id} clicking and added to the list")
self.clicked_hotspots.append(obj_id)
print("list of clicked_hotspots")
print(self.clicked_hotspots)
# Click on the second object and append obj_id to clicked_hotspots list
bbox_width = obj_data["ltrb"][2] - obj_data["ltrb"][0]
bbox_height = obj_data["ltrb"][3] - obj_data["ltrb"][1]
random_x_offset = bbox_width * random.uniform(-.15, .15)
random_y_offset = bbox_height * random.uniform(-0.15, 0.15)
random_point = (round(obj_data["ltrb"][0] + bbox_width / 2 + random_x_offset),
round(obj_data["ltrb"][1] + bbox_height / 2 + random_y_offset))
random_point = (random_point[0] + window_position.topleft[0],
random_point[1] + window_position.topleft[1])
original_position = pyautogui.position()
pyautogui.moveTo(random_point)
pyautogui.click()
pyautogui.moveTo(original_position)
break
def start_recording(self, window_title):
self.recording = True
self.recording_thread = threading.Thread(target=self._record_screen, args=(window_title,), daemon=True)
self.recording_thread.start()
def stop_recording(self):
self.recording = False
def _record_screen(self, window_title):
window_pos = pyautogui.getWindowsWithTitle(window_title)[0]
if window_pos is None:
print("Window not found")
return
if window_pos.isMinimized:
window_pos.restore()
top_left_x = window_pos.topleft[0]
top_left_y = window_pos.topleft[1]
width = window_pos.width
height = window_pos.height
# set our capture window info for calculating and storing center point.
self._set_window_variables(top_left_x=top_left_x, top_left_y=top_left_y, width=width, height=height)
# Create OpenCV window
cv2.namedWindow("Bot", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Bot", 900, 600)
can_click = True
last_click_time = time.time()
# timing variable for periodically clearing out self.clicked_hotspots
# in case of a misclick situation we may occasionally want to click an object
# that has already been clicked. misclicks happen irl so it is kind of beneficial that
# they happen every so often in our application
last_clear_timer = time.time()
last_log_clear = time.time()
xp_detection_delay = 1
# Create background subtractor object with history of 100 frames
history = 250
bg_subtractor = cv2.createBackgroundSubtractorMOG2(history, 10, False)
while self.recording:
img = pyautogui.screenshot(region=(top_left_x, top_left_y, width, height))
# for our background mask there is the second one
frame = np.array(img)
frame2 = np.array(img)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2RGB)
# Show where hardcoded avatar position is
cv2.circle(frame, self.frame_center, 5, (255, 255, 0), -1)
results = model(img, size=640)
predictions = results.pandas().xyxy
bboxes, confidences, classNames = extract_bounding_boxes(predictions)
# shaping the bounding box info for deepSort tracker. we really only need to track
# agility-hotspot and portal
formatted_bbs = [([box[0], box[1], box[2] - box[0], box[3] - box[1]], confidence, class_name)
for box, confidence, class_name in zip(bboxes, confidences, classNames)
if class_name == "agility-hotspot" and confidence > 0.6]
xp_bounding_box = [(box, confidence, class_name)
for box, confidence, class_name in zip(bboxes, confidences, classNames)
if class_name == "xp" and confidence > 0.3]
tracks = self.tracker.update_tracks(formatted_bbs, frame=frame)
tracked_objects_sorted = sort_tracks_by_distance(self.frame_center, tracks)
# display the info in frame
for i, object in enumerate(tracked_objects_sorted):
track_id = object[0]
track_data = object[1]
ltrb = track_data["ltrb"]
bbox_center = track_data["bbox_center"]
x1, y1, x2, y2 = map(int, ltrb)
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(frame, f"ID: {track_id} agility spot", (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
if i == 0:
line_color = (0, 255, 0)
else:
line_color = (0, 0, 255)
cv2.line(frame, tuple(map(int, self.frame_center)), tuple(map(int, bbox_center)), line_color, 2)
for object in xp_bounding_box:
x1, y1, x2, y2 = map(int, tuple(object[0]))
cv2.rectangle(frame, (x1, y1), (x2, y2), (230, 216, 173), 2)
cv2.putText(frame, f"{object[2]}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (230, 216, 173), 2)
# Apply background subtraction to get foreground mask
fg_mask = bg_subtractor.apply(frame2)
# Count the number of white pixels in the foreground mask
pixel_count = fg_mask.shape[0] * fg_mask.shape[1]
white_pixels = cv2.countNonZero(fg_mask)
# Check if a certain percentage of the pixels have changed in the foreground mask
if white_pixels / pixel_count > 0.1: # Change the threshold value as needed
text = "Running"
cv2.putText(frame, text, (10, frame.shape[0]-10),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 0, 255), 2, cv2.LINE_AA)
can_click = False
else:
text = "Stationary"
cv2.putText(frame, text, (10, frame.shape[0]-10),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2, cv2.LINE_AA)
can_click = True
# we need to get the time that the first xp object is detected and then only after a little delay
# set the can_click to true. Reason being when the xp first gets detected the screen is still
# moving too much from the game's movment animations and so by the time we move our
# cursor to the bounding box the screen has shifted a little bit and we miss our target.
if len(xp_bounding_box) > 0:
self.xp_detected_log.append(time.time())
if len(xp_bounding_box) == 0 and time.time() - last_log_clear > 2:
self.xp_detected_log.clear()
last_log_clear = time.time()
if len(self.xp_detected_log) > 0:
if time.time() - self.xp_detected_log[0] > xp_detection_delay:
if time.time() - last_click_time > 3:
self.move_mouse_and_click(tracked_objects_sorted, window_pos)
last_click_time = time.time()
if can_click:
if time.time() - last_click_time > 3:
self.move_mouse_and_click(tracked_objects_sorted, window_pos)
last_click_time = time.time()
# clear the list that prevents us from clicking the same spot twice within a time threshold
# sometimes when traversing an obsticle that same obsticle is still the closest one and you
# you don't want to keep trying to go backwards.
if time.time() - last_clear_timer > 19:
# clearing the list every 25 seconds
self.clicked_hotspots.clear()
last_clear_timer = time.time()
cv2.imshow("Bot", frame)
# Show the foreground mask
cv2.imshow("Foreground Mask", fg_mask)
if cv2.waitKey(1) == ord("q"):
cv2.destroyAllWindows()
self.recording = False
break
cv2.destroyAllWindows()
def _set_window_variables(self, top_left_x, top_left_y, width, height):
# tweak this to hard code avatar position when zoomed out
# currently there seems to be a little difference between pyautogui
# and the pixel coordinates where as the y isn't quite right
# needing to go further down the window or in other words needs to be larger
# value because y coordinates start counting from the top and get larger
# as they go down the screen
center_x_percentage = 0.50 # 50% of the width
center_y_percentage = 0.52 # 50% of the height
self.frame_center = (int(width * center_x_percentage), int(height * center_y_percentage))
self.top_left_x_window_capture = top_left_x
self.top_left_y_window_capture = top_left_y
class App:
def __init__(self, master):
self.master = master
self.master.geometry("400x300")
self.master.title("bot")
self.recorder = WindowCapture()
self.label = tk.Label(master, text="Enter window title")
self.label.pack(fill=tk.NONE, expand=False, side=tk.TOP, padx=10, pady=10)
self.window_title_entry = tk.Entry(master)
self.window_title_entry.pack(fill=tk.NONE, expand=False, side=tk.TOP, padx=10, pady=5)
self.start_button = tk.Button(master, text="Start", command=self.start_recording)
self.stop_button = tk.Button(master, text="Stop", command=self.stop_recording, state=tk.DISABLED)
self.start_button.pack(fill=tk.BOTH, expand=True, side=tk.TOP, padx=10, pady=5)
self.stop_button.pack(fill=tk.BOTH, expand=True, side=tk.TOP, padx=10, pady=5)
self.master.protocol("WM_DELETE_WINDOW", self.on_app_close)
def start_recording(self):
# Stop any existing recording or clearing of the dict
self.recorder.stop_recording()
window_title = self.window_title_entry.get()
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.recorder.start_recording(window_title)
def stop_recording(self):
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.recorder.stop_recording()
# Clear the clicked hotspots list when the recording is stopped
self.recorder.clicked_hotspots.clear()
def on_app_close(self):
self.recorder.stop_recording()
self.master.destroy()
if __name__ == "__main__":
device = torch.device('cuda')
if torch.cuda.is_available():
print("CUDA is available! PyTorch is using the GPU.")
else:
print("CUDA is not available. PyTorch is using the CPU.")
model = custom(path_or_model="runs/train/agility_priff_detector2/weights/best.pt") # custom example
root = tk.Tk()
app = App(root)
root.mainloop()