Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add backtesting #20

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ If everything is configured then the following scripts have to be executed:
* `python -m scripts.labels -c config.json`
* `python -m scripts.train -c config.json`
* `python -m scripts.train_signals -c config.json`
* `python -m scripts.backtest -c config.json`

Without a configuration file the scripts will use the default parameters which is useful for testing purposes and not intended for showing good performance.

Expand Down Expand Up @@ -126,6 +127,11 @@ Configuration:
This script simulates trades using many buy-sell signal parameters and then chooses the best performing signal parameters:
* Script: `python -m scripts.train_signals -c config.json`

## Backtest the model

This script uses the model toguether with features created to backtest the stragegy defined in the configuration:
* Script: `python -m scripts.backtest -c config.json`

# Prediction online based on trained models (service)

This script starts a service which periodically executes one and the same task: load latest data, generate features, make predictions, generate signals, notify subscribers:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ python-binance>=1.0.* # pip install python-binance
# yfinance # if yahoo quotes are going to be used
#python-snappy # Compression for fastparquet (gzip is always available)
#fastparquet # "conda install -c conda-forge fastparquet"
backtesting>=0.3.3
65 changes: 65 additions & 0 deletions scripts/backtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import click

from backtesting import Backtest, Strategy

from service.App import *
from service.analyzer import *

import logging
log = logging.getLogger('backtest')

@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
load_config(config_file)
symbol = App.config["symbol"]
time_column = App.config["time_column"]
log.info(f"Starting backtesting for: {symbol}. ")

App.analyzer = Analyzer(App.config)
data_path = Path(App.config["data_folder"]) / symbol
file_path = (data_path / App.config.get("feature_file_name")).with_suffix(".csv")
if not file_path.is_file():
log.error(f"Data file does not exist: {file_path}")
return

log.info(f"Loading data from source data file {file_path}...")
df = pd.read_csv(file_path, parse_dates=[time_column], index_col=time_column)

df = App.analyzer.generate_score(df)
df = App.analyzer.aggregate_post_process(df)

signal_model = App.config['signal_model']
apply_rule_with_score_thresholds(df, signal_model, 'buy_score_column', 'sell_score_column')
if len(df.loc[df["buy_signal_column"] | df["sell_signal_column"]]) == 0:
log.info("No buy or sell signals in dataset")
return

df.rename(columns={'open':'Open', 'high':'High', 'low':'Low', 'close':'Close', 'volume':'Volume'}, inplace=True)
log.info(f"Finished loading {len(df)} records with {len(df.columns)} columns.")

class ITB(Strategy):
def init(self):
pass
def next(self):
buy_signal = self.data.df["buy_signal_column"].iloc[-1:].values[0]
sell_signal = self.data.df["sell_signal_column"].iloc[-1:].values[0]

if buy_signal == True:
if self.position.is_short:
self.position.close()
elif not self.position.is_long:
self.buy(size=.1)
if sell_signal == True:
if self.position.is_long:
self.position.close()
elif not self.position.is_short:
self.buy(size=.1)

bt = Backtest(df, ITB, cash=50000, commission=.002)
stats = bt.run()
bt.plot()
log.info(stats)

if __name__ == '__main__':
main()
133 changes: 73 additions & 60 deletions service/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def __init__(self, config):
model_path = PACKAGE_ROOT / model_path
model_path = model_path.resolve()

buy_labels = App.config["buy_labels"]
sell_labels = App.config["sell_labels"]
buy_labels = App.config["score_aggregation"]["buy_labels"]
sell_labels = App.config["score_aggregation"]["sell_labels"]
self.models = {label: load_model_pair(model_path, label) for label in buy_labels + sell_labels}

#
Expand Down Expand Up @@ -268,59 +268,8 @@ def store_queue(self):
with open(file, 'a+') as f:
f.write(data_str + "\n")

#
# Analysis (features, predictions, signals etc.)
#

def analyze(self):
"""
1. Convert klines to df
2. Derive (compute) features (use same function as for model training)
3. Derive (predict) labels by applying models trained for each label
4. Generate buy/sell signals by applying rule models trained for best overall trade performance
"""
symbol = App.config["symbol"]

last_kline_ts = self.get_last_kline_ts(symbol)
last_kline_ts_str = str(pd.to_datetime(last_kline_ts, unit='ms'))

log.info(f"Analyze {symbol}. Last kline timestamp: {last_kline_ts_str}")

#
# 1.
# MERGE: Produce a single data frame with înput data from all sources
#
data_sources = App.config.get("data_sources", [])
if not data_sources:
data_sources = [{"folder": App.config["symbol"], "file": "klines", "column_prefix": ""}]

# Read data from online sources into data frames
for ds in data_sources:
if ds.get("file") == "klines":
try:
klines = self.klines.get(ds.get("folder"))
df = klines_to_df(klines)

# Validate
source_columns = ['open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av']
if df.isnull().any().any():
null_columns = {k: v for k, v in df.isnull().any().to_dict().items() if v}
log.warning(f"Null in source data found. Columns with Null: {null_columns}")
# TODO: We might receive empty strings or 0s in numeric data - how can we detect them?
# TODO: Check that timestamps in 'close_time' are strictly consecutive
except Exception as e:
log.error(f"Error in klines_to_df method: {e}. Length klines: {len(klines)}")
return
else:
log.error("Unknown data sources. Currently only 'klines' is supported. Check 'data_sources' in config, key 'file'")
return
ds["df"] = df

# Merge in one df with prefixes and common regular time index
df = merge_data_sources(data_sources)

def generate_features(self, df:pd.DataFrame, all:bool=False) -> pd.DataFrame:
#
# 2.
# Generate all necessary derived features (NaNs are possible due to short history)
#
# We want to generate features only for the last rows (for performance reasons)
Expand All @@ -338,13 +287,18 @@ def analyze(self):

# Apply all feature generators to the data frame which get accordingly new derived columns
# The feature parameters will be taken from App.config (depending on generator)
if all:
last_rows = 0

for fs in feature_sets:
df, _ = generate_feature_set(df, fs, last_rows=last_rows)

df = df.iloc[-last_rows:] # For signal generation, ew will need only several last rows

return df

def generate_score(self, df:pd.DataFrame) -> pd.DataFrame:
#
# 3.
# Apply ML models and generate score columns
#

Expand Down Expand Up @@ -384,12 +338,10 @@ def analyze(self):

# This df contains only one (last) record
df = df.join(score_df)
#df = pd.concat([predict_df, score_df], axis=1)

#
# 4.
# Aggregate and post-process
#
return df

def aggregate_post_process(self, df:pd.DataFrame) -> pd.DataFrame:
sa_sets = ['score_aggregation', 'score_aggregation_2']
for i, score_aggregation_set in enumerate(sa_sets):
score_aggregation = App.config.get(score_aggregation_set)
Expand All @@ -411,7 +363,68 @@ def analyze(self):
aggregate_scores(df, score_aggregation, sell_column, sell_labels)
# Mutually adjust two independent scores with opposite semantics
combine_scores(df, score_aggregation, buy_column, sell_column)
return df
#
# Analysis (features, predictions, signals etc.)
#

def analyze(self):
"""
1. Convert klines to df
2. Derive (compute) features (use same function as for model training)
3. Derive (predict) labels by applying models trained for each label
4. Generate buy/sell signals by applying rule models trained for best overall trade performance
"""
symbol = App.config["symbol"]

last_kline_ts = self.get_last_kline_ts(symbol)
last_kline_ts_str = str(pd.to_datetime(last_kline_ts, unit='ms'))

log.debug(f"Analyze {symbol}. Last kline timestamp: {last_kline_ts_str}")

#
# 1.
# MERGE: Produce a single data frame with înput data from all sources
#
data_sources = App.config.get("data_sources", [])
if not data_sources:
data_sources = [{"folder": App.config["symbol"], "file": "klines", "column_prefix": ""}]

# Read data from online sources into data frames
for ds in data_sources:
if ds.get("file") == "klines":
try:
klines = self.klines.get(ds.get("folder"))
df = klines_to_df(klines)

# Validate
source_columns = ['open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av']
if df.isnull().any().any():
null_columns = {k: v for k, v in df.isnull().any().to_dict().items() if v}
log.warning(f"Null in source data found. Columns with Null: {null_columns}")
# TODO: We might receive empty strings or 0s in numeric data - how can we detect them?
# TODO: Check that timestamps in 'close_time' are strictly consecutive
except Exception as e:
log.error(f"Error in klines_to_df method: {e}. Length klines: {len(klines)}")
return
else:
log.error("Unknown data sources. Currently only 'klines' is supported. Check 'data_sources' in config, key 'file'")
return
ds["df"] = df

# Merge in one df with prefixes and common regular time index
df = merge_data_sources(data_sources)

# 2
df = self.generate_features(df)

# 3.
df = self.generate_score(df)
#df = pd.concat([predict_df, score_df], axis=1)

# 4.
df = self.aggregate_post_process(df)

#
# 5.
# Apply rule to last row
Expand Down