-
Notifications
You must be signed in to change notification settings - Fork 0
/
camera.py
49 lines (38 loc) · 1.32 KB
/
camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import threading
import binascii
from time import sleep
#from utils import base64_to_pil_image, pil_image_to_base64
class Camera(object):
def __init__(self, process):
self.to_process = []
self.to_output = []
self.process = process
thread = threading.Thread(target=self.keep_processing, args=())
thread.daemon = True
thread.start()
def process_one(self):
if not self.to_process:
return
# pass
# input is an ascii string.
input_str = self.to_process.pop(0)
# convert it to a pil image
#input_img = base64_to_pil_image(input_str)
# ################## where the hard work is done ############
# # output_img is an PIL image
# output_img = self.process.process(input_img)
#
# # output_str is a base64 string in ascii
# output_str = pil_image_to_base64(output_img)
# convert eh base64 string in ascii to base64 string in _bytes_
self.to_output.append(input_str)
def keep_processing(self):
while True:
self.process_one()
sleep(0.01)
def enqueue_input(self, input):
self.to_process.append(input)
def get_frame(self):
while not self.to_output:
sleep(0.05)
return self.to_output.pop(0)