-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__multithread-test.py
85 lines (72 loc) · 2.71 KB
/
__multithread-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import threading
from flask import Flask, request
import time
def sanity(function):
def wrapper(*args):
try:
print(f'\033[46m[+] {function}:{__name__}@{function.__name__}({args}) \033[0m')
return function(*args)
except Exception as ex:
print(f'\033[41m[-] {function}:{__name__}@{function.__name__}({args}): FAILED -> {ex} \033[0m')
return ex
return wrapper
def threader(function):
def wrapper(*args):
try:
print(f'\033[46m[+] {function}:{__name__}@{function.__name__}({args}) started as thread\033[0m')
threading.Thread(target=lambda: function(args)).start()
except Exception as ex:
print(f'\033[41m[-] {function}:{__name__}@{function.__name__}({args}): FAILED THREAD-> {ex} \033[0m')
return ex
return wrapper
### FLASK ###
app = Flask(__name__)
app.config["DEBUG"] = True
# Webserver
@sanity
def web():
app.run(debug=True, host='0.0.0.0', port=5000, threaded=True)
# homepage
@app.route("/", methods=['GET', 'POST'])
def hello_world():
### ERROR ###
# [-] <function web at 0x7fe48843db40>:__main__@web(()): FAILED -> signal only works in main thread of the main interpreter
threading.Thread(target=lambda: web_task('WEBCALL')).start()
# same error when:
web_task('WEBCALL')
### Solution in name == main
return 'WEB THREAD'
### BACKGROUND TASKS ###
# called from main
@sanity
def task(arg1):
print(f"\033[92m[!!!]Start Task with arg1={arg1}\033[0m")
threading.Thread(target=lambda: called_task('called')).start()
for t in range(6):
print("waiting for task1")
time.sleep(2)
# called from thread
@sanity
def called_task(arg1):
print(f"\033[92m[!!!]Start Task with arg1={arg1}\033[0m")
for t in range(3):
print("waiting for task2")
time.sleep(5)
# called from web
@threader
def web_task(arg1):
print(f"\033[92m[!!!] Started WebTask with arg1={arg1}\033[0m")
@sanity
def web_task_caller(arg1):
threading.Thread(target=lambda: web_task(arg1)).start()
## EXECUTE ###
if __name__ == '__main__':
### FIX ###
# start backgroud threads
threading.Thread(target=lambda: task('main')).start()
# The Flask app now runs directly in the main thread when the script is executed.
# The threading logic in the Flask route now avoids operations that require main-thread execution.
# The code defines a Flask application with various threading operations.
# The main issue stems from attempting to start a new thread from within a Flask route,
# which then calls a function that potentially interacts with signals or operations that require being in the main thread.
web()