-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
133 lines (105 loc) · 4.02 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
import argparse
from coordinator import Coordinator
from cameras.opencv_camera import OpenCVCamera as Camera
from flask_socketio import SocketIO
from flask import Flask, request, send_file, Response
from usecases.check_set import CheckSetUseCase
from motor.motor_controller import MotorController
app = Flask(__name__)
socketio = SocketIO(app)
coordinator = None
motor = MotorController()
@app.route("/viewer")
def getViewer():
return send_file("viewer.html")
@app.route("/status")
def status():
return coordinator.status()
@app.route("/start", methods=['POST'])
def start():
def emitFrame(frame):
socketio.emit('frame', frame)
socketio.sleep(0)
def emitUsecase(featureSet):
socketio.emit('set', featureSet)
socketio.sleep(0)
socketio.start_background_task(motor.run, socketio.sleep)
socketio.start_background_task(
coordinator.start, frameCallback=emitFrame, usecaseCallback=emitUsecase)
return coordinator.status()
@app.route("/motor/start", methods=["POST"])
def start_motor():
socketio.start_background_task(motor.run, socketio.sleep)
return motor.status()
@app.route("/motor/stop", methods=["POST"])
def stop_motor():
motor.stop()
return motor.status()
@app.route("/motor/accelerate", methods=["POST"])
def accelerate_motor():
motor.change_step_pause(-0.0002)
return motor.status()
@app.route("/motor/decelerate", methods=["POST"])
def decelerate_motor():
motor.change_step_pause(0.0002)
return motor.status()
@app.route("/motor/speed", methods=["POST"])
def change_motor_speed():
delta = request.args.get('delta')
if delta:
motor.change_step_pause(delta=float(delta))
else:
abs = float(request.args['abs'])
motor.change_step_pause(abs=abs)
return motor.status()
@app.route("/motor/status")
def motor_status():
return motor.status()
@app.route("/motor/reverse", methods=["POST"])
def reverse_motor():
motor.reverse()
return motor.status()
@app.route("/stop", methods=['POST'])
def stop():
coordinator.stop()
return coordinator.status()
@app.route("/configureDetector", methods=['POST'])
def configureDetector():
coordinator.configureDetector(request.json)
return coordinator.status()
@app.route("/configureUsecase", methods=['POST'])
def configureUsecase():
coordinator.configureUsecase(request.json)
return coordinator.status()
def main(args):
global coordinator
if args.detector == 'pycoral':
print("Loading PyCoral Detector")
from detectors.pycoral_detector import PyCoralDetector as Detector
elif args.detector == 'tf2':
print("Loading TF2 Detector")
from detectors.tf2_detector import TF2Detector as Detector
elif args.detector == 'ms-tflite':
print("Loading MS TFLite-Detector")
from detectors.ms_detector import MSTFLiteDetector as Detector
elif args.detector == 'ms-tf2':
print("Loading MS TF-Detector")
from detectors.ms_detector import MSTFDetector as Detector
camera = Camera(videoSource=args.video)
detector = Detector(args.model)
usecase = CheckSetUseCase(detector.labels)
coordinator = Coordinator(camera, detector, usecase)
print('[INFO] Starting server at http://localhost:5000')
socketio.run(app=app, host='0.0.0.0', port=5000)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Python Detection server using a webcam and models trained with AutoML.')
parser.add_argument('-v', '--video', default='/dev/video0',
help='The source of the video (path)')
parser.add_argument('-m', '--model', required=True,
help='The directory containing a model.(pb|tflite) and a corresponding labels.txt, relative to the current dir.')
parser.add_argument('-d', '--detector', choices=['pycoral', 'ms-tflite', 'ms-tf2', 'tf2'], default="pycoral",
help='The detector to be used. Has to be compatible to the model.')
args = parser.parse_args()
main(args)