-
Notifications
You must be signed in to change notification settings - Fork 31
/
os_signalbuy_RECOMM.py
141 lines (114 loc) · 4.45 KB
/
os_signalbuy_RECOMM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Based off Firewatch custsignalmod.py
from tradingview_ta import TA_Handler, Interval, Exchange
# use for environment variables
import os
# use if needed to pass args to external modules
import sys
# used for directory handling
import glob
import threading
import time
# my helper utils
from helpers.os_utils import(rchop)
MY_EXCHANGE = 'BINANCE'
MY_SCREENER = 'CRYPTO'
MY_FIRST_INTERVAL = Interval.INTERVAL_1_MINUTE
MY_SECOND_INTERVAL = Interval.INTERVAL_5_MINUTES
MY_THIRD_INTERVAL = Interval.INTERVAL_15_MINUTES
PAIR_WITH = 'USDT'
TIME_TO_WAIT = 1 # Minutes to wait between analysis
FULL_LOG = True # List anylysis result to console
SIGNAL_NAME = 'os_signalbuy_RECOMM'
SIGNAL_FILE = 'signals/' + SIGNAL_NAME + '.buy'
TICKERS = 'tickers.txt'
TICKERS_OVERRIDE = 'tickers_signalbuy.txt'
if os.path.exists(TICKERS_OVERRIDE):
TICKERS = TICKERS_OVERRIDE
TRADINGVIEW_EX_FILE = 'tradingview_ta_unknown'
def analyze(pairs):
taMax = 0
taMaxCoin = 'none'
signal_coins = {}
first_analysis = {}
second_analysis = {}
third_analysis = {}
first_handler = {}
second_handler = {}
third_handler = {}
if os.path.exists(SIGNAL_FILE):
os.remove(SIGNAL_FILE)
if os.path.exists(TRADINGVIEW_EX_FILE):
os.remove(TRADINGVIEW_EX_FILE)
for pair in pairs:
first_handler[pair] = TA_Handler(
symbol=pair,
exchange=MY_EXCHANGE,
screener=MY_SCREENER,
interval=MY_FIRST_INTERVAL,
timeout= 10
)
second_handler[pair] = TA_Handler(
symbol=pair,
exchange=MY_EXCHANGE,
screener=MY_SCREENER,
interval=MY_SECOND_INTERVAL,
timeout= 10
)
third_handler[pair] = TA_Handler(
symbol=pair,
exchange=MY_EXCHANGE,
screener=MY_SCREENER,
interval=MY_THIRD_INTERVAL,
timeout= 10
)
for pair in pairs:
try:
first_analysis = first_handler[pair].get_analysis()
second_analysis = second_handler[pair].get_analysis()
third_analysis = third_handler[pair].get_analysis()
except Exception as e:
print(f'{SIGNAL_NAME}')
print("Exception:")
print(e)
print (f'Coin: {pair}')
print (f'First handler: {first_handler[pair]}')
print (f'Second handler: {second_handler[pair]}')
print (f'Second handler: {third_handler[pair]}')
with open(TRADINGVIEW_EX_FILE,'a+') as f:
#f.write(pair.removesuffix(PAIR_WITH) + '\n')
f.write(rchop(pair, PAIR_WITH) + '\n')
continue
first_recommendation = first_analysis.summary['RECOMMENDATION']
second_recommendation = second_analysis.summary['RECOMMENDATION']
third_recommendation = third_analysis.summary['RECOMMENDATION']
if FULL_LOG:
print(f'{SIGNAL_NAME}: {pair} First {first_recommendation} Second {second_recommendation} Third {third_recommendation}')
if (first_recommendation == "BUY" or first_recommendation == "STRONG_BUY") and \
(second_recommendation == "BUY" or second_recommendation == "STRONG_BUY") and \
(third_recommendation == "BUY" or third_recommendation == "STRONG_BUY"):
print(f'{SIGNAL_NAME}: buy signal detected on {pair}')
signal_coins[pair] = pair
with open(SIGNAL_FILE,'a+') as f:
f.write(pair + '\n')
return signal_coins
def do_work():
while True:
try:
if not os.path.exists(TICKERS):
time.sleep((TIME_TO_WAIT*60))
continue
signal_coins = {}
pairs = {}
pairs=[line.strip() for line in open(TICKERS)]
for line in open(TICKERS):
pairs=[line.strip() + PAIR_WITH for line in open(TICKERS)]
if not threading.main_thread().is_alive(): exit()
print(f'{SIGNAL_NAME}: Analyzing {len(pairs)} coins')
signal_coins = analyze(pairs)
print(f'{SIGNAL_NAME}: {len(signal_coins)} coins with Buy Signals. Waiting {TIME_TO_WAIT} minutes for next analysis.')
time.sleep((TIME_TO_WAIT*60))
#except Exception as e:
# print(f'{SIGNAL_NAME}: Exception do_work() 1: {e}')
# continue
except KeyboardInterrupt as ki:
continue