-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPTZSession.py
120 lines (89 loc) · 3.25 KB
/
PTZSession.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import time
import threading
import PTZControl
import cv2
import uuid
import base64
class PTZSession:
def __init__(self):
self.session_id = str(uuid.uuid4())
self.ptz = None
self.video_thread = None
self.video_thread_running = False
self.session_thread = None
self.frame = None
self.frame_width = 0
self.frame_height = 0
self.timeout = 30
self.last_time = 0
self.session_end = False
def create_session(self, ip, port, username, password):
self.ip = ip
self.port = port
self.username = username
self.password = password
self.stop_video_thread()
self.ptz = PTZControl.PTZControl(ip, port, username, password)
if self.ptz.is_connected():
res = self.ptz.get_stream_uri()
rtsp_uri = res['data']['Uri'].replace("rtsp://", "rtsp://"+self.username+":"+self.password+"@")
self.video_thread_running = True
self.video_thread = threading.Thread(target=self.video_thread_func, args=(rtsp_uri,))
self.video_thread.start()
self.activate_session()
self.session_thread = threading.Thread(target=self.session_thread_func)
self.session_thread.start()
return self.session_id
else:
return ''
def change_profile(self, profile):
# print("change profile")
self.ptz.set_profile(profile)
self.stop_video_thread()
# print("cap thread exit")
if self.ptz.is_connected():
res = self.ptz.get_stream_uri()
rtsp_uri = res['data']['Uri'].replace("rtsp://", "rtsp://"+self.username+":"+self.password+"@")
# print(rtsp_uri)
self.video_thread_running = True
self.video_thread = threading.Thread(target=self.video_thread_func, args=(rtsp_uri,))
self.video_thread.start()
self.activate_session()
def stop_video_thread(self):
if self.video_thread is not None:
self.video_thread_running = False
self.video_thread.join()
self.video_thread = None
def video_thread_func(self, uri):
cap = cv2.VideoCapture(uri)
self.frame_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
self.frame_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
# print(str(int(self.frame_width)) + "x" + str(int(self.frame_height)))
while self.video_thread_running:
ret, frame = cap.read()
if ret:
self.frame = frame
else:
self.video_thread_running = False
break
cap.release()
print("Video Stream Thread exit")
def session_thread_func(self):
print("Session Thread for " + self.ip + ":" + str(self.port) + " started")
while (time.time() - self.last_time) < self.timeout:
time.sleep(1)
# Session timeout
self.stop_video_thread()
print("Session Thread for " + self.ip + ":" + str(self.port) + " end")
self.session_end = True
def is_session_end(self):
return self.session_end
def activate_session(self):
self.last_time = time.time()
def snapshot(self):
if self.frame is not None:
jpeg = cv2.imencode('.jpg', self.frame)[1]
image_base64 = 'data:image/jpeg;base64,' + str(base64.b64encode(jpeg))[2:-1]
return {'code': 200, 'message': 'snapshot', 'data': {'w': self.frame_width, 'h': self.frame_height, 'image': image_base64}}
else:
return {'code': 500, 'message': 'No frame received', 'data': {}}