-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathConsumer.py
252 lines (191 loc) · 10.6 KB
/
Consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import re
import warnings
import pandas as pd
import numpy as np
import pytz
from cachetools import TTLCache
from datetime import datetime, timedelta
from functools import reduce
from pandas import DataFrame
import talib.abstract as ta
import freqtrade.vendor.qtpylib.indicators as qtpylib
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_prev_date
from freqtrade.strategy import (IStrategy, merge_informative_pair, stoploss_from_open, informative,
IntParameter, DecimalParameter, CategoricalParameter)
from freqtrade.persistence import Trade
import psycopg2
from questdb.ingress import Sender
from sqlalchemy import create_engine, text
from sqlalchemy.exc import DatabaseError
#from prettytable import PrettyTable
import logging
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)
"""
Freqtrade Consumer Strategy
to receive the dataframes of producers and store them in QuestDB for storage or further processing
Requirements:
pip install influxdb, questdb, cachetools
"""
class Consumer(IStrategy):
def version(self) -> str:
return "v1.0"
# Trailing stop:
trailing_stop = False
timeframe = '5m'
can_short = True
store_refresh_period = 300 # 5 minutes
pair_cache = TTLCache(maxsize=100, ttl=store_refresh_period)
stoploss = -0.06
use_custom_stoploss = False
use_entry_signal = True
entry_profit_only = False
ignore_roi_if_entry_signal = False
process_only_new_candles = False # required for consumers
use_custom_stoploss = False
engine = None
producers = ['ft_1', 'ft_2', 'ft_3', 'ft_4','ft_5']
producer_tfs = {
'ft_1': '5m',
'ft_2': '5m',
'ft_3': '5m',
'ft_4': '5m',
'ft_5': '5m',
}
def bot_start(self, **kwargs) -> None:
conn_str = 'postgresql+psycopg2://admin:quest@questdb:8812/qdb'
self.engine = create_engine(conn_str, echo=False)
return
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
dataframe['mid'] = (dataframe['open'] + dataframe['close']) / 2.0
timeframe = self.timeframe
#log.debug(f"Processing indicators for pair {pair} with main timeframe {timeframe}")
expected_columns = [] # Initialize the variable before the loop
required_columns_default_0 = []
required_columns_default_none = []
for producer in self.producers:
producer_timeframe = self.producer_tfs.get(producer, '5m')
#log.debug(f"Fetching data for producer {producer} with timeframe {producer_timeframe}")
producer_dataframe, _ = self.dp.get_producer_df(pair, producer_name=producer, timeframe=producer_timeframe)
if not producer_dataframe.empty:
#log.debug(f"Data for producer {producer} is not empty.")
columns = [
f'enter_long_{producer}',
f'exit_long_{producer}',
f'enter_short_{producer}',
f'exit_short_{producer}',
]
expected_columns.extend(columns)
required_columns_default_0.extend([f'{col}_{producer}' for col in ['enter_long', 'enter_short', 'exit_long', 'exit_short']])
required_columns_default_none.extend([f'{col}_{producer}' for col in ['enter_tag','exit_tag']])
# Check if the suffixed versions of 'enter_long' and 'enter_short' are in producer_dataframe
should_store_df = all(col in producer_dataframe.columns for col in ['enter_long', 'enter_short'])
cache_key = f"{pair}_{producer}"
if cache_key not in self.pair_cache and should_store_df:
sdf = producer_dataframe.copy()
self.store_df(sdf, producer, pair, self.engine, host='questdb', port=9009)
#log.debug(f"Storing dataframe for pair {pair} and producer {producer} completed.")
self.pair_cache[cache_key] = True
dataframe = merge_informative_pair(dataframe, producer_dataframe, timeframe,
producer_timeframe, # Using the producer's timeframe here
append_timeframe=False,
suffix=producer, ffill=True)
else:
log.debug(f"Data for producer {producer} is empty.")
for col in expected_columns:
if col not in dataframe.columns:
if col in required_columns_default_0:
dataframe[col] = 0
elif col in required_columns_default_none:
dataframe[col] = None
dataframe['rsi'] = ta.RSI(dataframe, timeperiod=14)
dataframe['mfi'] = ta.MFI(dataframe, timeperiod=14)
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe.loc[:, 'enter_tag'] = ''
long_conditions = [(dataframe[f'enter_long_{producer}'].fillna(False).astype(bool)) for producer in self.producers if f'enter_long_{producer}' in dataframe.columns]
short_conditions = [(dataframe[f'enter_short_{producer}'].fillna(False).astype(bool)) for producer in self.producers if f'enter_short_{producer}' in dataframe.columns]
# Checking and combining all producer conditions
if long_conditions:
combined_long_condition = reduce(lambda x, y: x & y, long_conditions)
dataframe.loc[combined_long_condition, 'enter_tag'] += 'consumer_enter '
dataframe.loc[combined_long_condition, 'enter_long'] = 1
else:
dataframe['enter_long'] = 0
if short_conditions:
combined_short_condition = reduce(lambda x, y: x & y, short_conditions)
dataframe.loc[combined_short_condition, 'enter_tag'] += 'consumer_enter '
dataframe.loc[combined_short_condition, 'enter_short'] = 1
else:
dataframe['enter_short'] = 0
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe.loc[:, 'exit_tag'] = ''
long_exit_conditions = [(dataframe[f'exit_long_{producer}'].fillna(False).astype(bool)) for producer in self.producers if f'exit_long_{producer}' in dataframe.columns]
short_exit_conditions = [(dataframe[f'exit_short_{producer}'].fillna(False).astype(bool)) for producer in self.producers if f'exit_short_{producer}' in dataframe.columns]
# Checking and combining all producer conditions
if long_exit_conditions:
combined_long_exit_condition = reduce(lambda x, y: x & y, long_exit_conditions)
dataframe.loc[combined_long_exit_condition, 'exit_tag'] += 'consumer_exit '
dataframe.loc[combined_long_exit_condition, 'exit_long'] = 1
else:
dataframe['exit_long'] = 0
if short_exit_conditions:
combined_short_exit_condition = reduce(lambda x, y: x & y, short_exit_conditions)
dataframe.loc[combined_short_exit_condition, 'exit_tag'] += 'consumer_exit '
dataframe.loc[combined_short_exit_condition, 'exit_short'] = 1
else:
dataframe['exit_short'] = 0
return dataframe
def store_df(self, sdf: DataFrame, strategy: str, pair: str, engine, host: str = 'questdb', port: int = 9009) -> None:
""" Store the sdf into a database for a specific strategy and pair. """
#log.debug("Starting store_df function...")
ft_pair = pair
pair = pair.split('/')[0]
sdf.insert(0, 'pair', pair)
sdf.insert(1, 'ft_pair', ft_pair)
sdf.insert(2, 'time_interval', self.timeframe)
sdf_clean = self.clean_dataframe(sdf)
#log.debug("Cleaned the dataframe...")
with engine.connect() as connection:
try:
last_inserted_timestamp = self.get_last_inserted_timestamp(connection, strategy, pair)
log.debug(f"Last inserted row for strategy {strategy} and pair {pair}: {last_inserted_timestamp}")
data_to_insert = self.get_data_to_insert(sdf_clean, last_inserted_timestamp)
if not data_to_insert.empty:
self.send_data_to_sender(data_to_insert, host, port, strategy)
log.info(f"New rows stored for strategy: {strategy}, pair: {pair}")
connection.commit()
except DatabaseError as e:
self.handle_database_error(e, sdf_clean, strategy, pair, host, port)
def clean_dataframe(self, sdf_clean: DataFrame) -> DataFrame:
sdf_clean.columns = [re.sub('[%&-]', '_', col) for col in sdf_clean.columns]
sdf_clean = sdf_clean.replace(True, 1).replace(False, 0)
return sdf_clean
def get_last_inserted_timestamp(self, connection, strategy, pair) -> datetime:
query = text(f'select max(timestamp) from {strategy} where pair = :pair')
result = connection.execute(query, {"pair": pair}).fetchone()
if result and result[0]:
#log.debug(f"Timestamp retrieved from the database: {result[0]}")
return result[0].replace(tzinfo=pytz.UTC)
else:
#log.debug("No rows found in the database. Returning earliest datetime in dataframe...")
return datetime.min.replace(tzinfo=pytz.UTC)
def send_data_to_sender(self, data_to_insert: DataFrame, host: str, port: int, strategy: str) -> None:
with Sender(host, port) as sender:
#log.debug(f"Sending data to QuestDB {host}:{port} for table {strategy}...")
sender.dataframe(data_to_insert, table_name=strategy, at='date')
sender.flush()
#log.debug("Data sent successfully.")
def handle_database_error(self, e: Exception, sdf: DataFrame, strategy: str, pair: str, host: str, port: int) -> None:
if "table does not exist" in str(e).lower():
log.debug("Table does not exist. QuestDB will create the table.")
self.send_data_to_sender(sdf, host, port, strategy)
else:
log.error(f"Unhandled database error: {e}")
def get_data_to_insert(self, sdf: DataFrame, last_inserted_timestamp: datetime) -> DataFrame:
sdf['date'] = pd.to_datetime(sdf['date'])
filtered_df = sdf[sdf['date'] > last_inserted_timestamp]
return filtered_df