diff --git a/README.md b/README.md index 6749b18..ecb0e60 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ If everything is configured then the following scripts have to be executed: * `python -m scripts.labels -c config.json` * `python -m scripts.train -c config.json` * `python -m scripts.train_signals -c config.json` +* `python -m scripts.backtest -c config.json` Without a configuration file the scripts will use the default parameters which is useful for testing purposes and not intended for showing good performance. @@ -126,6 +127,11 @@ Configuration: This script simulates trades using many buy-sell signal parameters and then chooses the best performing signal parameters: * Script: `python -m scripts.train_signals -c config.json` +## Backtest the model + +This script uses the model toguether with features created to backtest the stragegy defined in the configuration: +* Script: `python -m scripts.backtest -c config.json` + # Prediction online based on trained models (service) This script starts a service which periodically executes one and the same task: load latest data, generate features, make predictions, generate signals, notify subscribers: diff --git a/requirements.txt b/requirements.txt index 4a7951e..de81a90 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ python-binance>=1.0.* # pip install python-binance # yfinance # if yahoo quotes are going to be used #python-snappy # Compression for fastparquet (gzip is always available) #fastparquet # "conda install -c conda-forge fastparquet" +backtesting>=0.3.3 \ No newline at end of file diff --git a/scripts/backtest.py b/scripts/backtest.py new file mode 100644 index 0000000..a575101 --- /dev/null +++ b/scripts/backtest.py @@ -0,0 +1,65 @@ +import click + +from backtesting import Backtest, Strategy + +from service.App import * +from service.analyzer import * + +import logging +log = logging.getLogger('backtest') + +@click.command() +@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name') +def main(config_file): + load_config(config_file) + symbol = App.config["symbol"] + time_column = App.config["time_column"] + log.info(f"Starting backtesting for: {symbol}. ") + + App.analyzer = Analyzer(App.config) + data_path = Path(App.config["data_folder"]) / symbol + file_path = (data_path / App.config.get("feature_file_name")).with_suffix(".csv") + if not file_path.is_file(): + log.error(f"Data file does not exist: {file_path}") + return + + log.info(f"Loading data from source data file {file_path}...") + df = pd.read_csv(file_path, parse_dates=[time_column], index_col=time_column) + + df = App.analyzer.generate_score(df) + df = App.analyzer.aggregate_post_process(df) + + signal_model = App.config['signal_model'] + apply_rule_with_score_thresholds(df, signal_model, 'buy_score_column', 'sell_score_column') + if len(df.loc[df["buy_signal_column"] | df["sell_signal_column"]]) == 0: + log.info("No buy or sell signals in dataset") + return + + df.rename(columns={'open':'Open', 'high':'High', 'low':'Low', 'close':'Close', 'volume':'Volume'}, inplace=True) + log.info(f"Finished loading {len(df)} records with {len(df.columns)} columns.") + + class ITB(Strategy): + def init(self): + pass + def next(self): + buy_signal = self.data.df["buy_signal_column"].iloc[-1:].values[0] + sell_signal = self.data.df["sell_signal_column"].iloc[-1:].values[0] + + if buy_signal == True: + if self.position.is_short: + self.position.close() + elif not self.position.is_long: + self.buy(size=.1) + if sell_signal == True: + if self.position.is_long: + self.position.close() + elif not self.position.is_short: + self.buy(size=.1) + + bt = Backtest(df, ITB, cash=50000, commission=.002) + stats = bt.run() + bt.plot() + log.info(stats) + +if __name__ == '__main__': + main() diff --git a/service/analyzer.py b/service/analyzer.py index 8554c35..9db6b3f 100644 --- a/service/analyzer.py +++ b/service/analyzer.py @@ -60,8 +60,8 @@ def __init__(self, config): model_path = PACKAGE_ROOT / model_path model_path = model_path.resolve() - buy_labels = App.config["buy_labels"] - sell_labels = App.config["sell_labels"] + buy_labels = App.config["score_aggregation"]["buy_labels"] + sell_labels = App.config["score_aggregation"]["sell_labels"] self.models = {label: load_model_pair(model_path, label) for label in buy_labels + sell_labels} # @@ -268,59 +268,8 @@ def store_queue(self): with open(file, 'a+') as f: f.write(data_str + "\n") - # - # Analysis (features, predictions, signals etc.) - # - - def analyze(self): - """ - 1. Convert klines to df - 2. Derive (compute) features (use same function as for model training) - 3. Derive (predict) labels by applying models trained for each label - 4. Generate buy/sell signals by applying rule models trained for best overall trade performance - """ - symbol = App.config["symbol"] - - last_kline_ts = self.get_last_kline_ts(symbol) - last_kline_ts_str = str(pd.to_datetime(last_kline_ts, unit='ms')) - - log.info(f"Analyze {symbol}. Last kline timestamp: {last_kline_ts_str}") - - # - # 1. - # MERGE: Produce a single data frame with înput data from all sources - # - data_sources = App.config.get("data_sources", []) - if not data_sources: - data_sources = [{"folder": App.config["symbol"], "file": "klines", "column_prefix": ""}] - - # Read data from online sources into data frames - for ds in data_sources: - if ds.get("file") == "klines": - try: - klines = self.klines.get(ds.get("folder")) - df = klines_to_df(klines) - - # Validate - source_columns = ['open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av'] - if df.isnull().any().any(): - null_columns = {k: v for k, v in df.isnull().any().to_dict().items() if v} - log.warning(f"Null in source data found. Columns with Null: {null_columns}") - # TODO: We might receive empty strings or 0s in numeric data - how can we detect them? - # TODO: Check that timestamps in 'close_time' are strictly consecutive - except Exception as e: - log.error(f"Error in klines_to_df method: {e}. Length klines: {len(klines)}") - return - else: - log.error("Unknown data sources. Currently only 'klines' is supported. Check 'data_sources' in config, key 'file'") - return - ds["df"] = df - - # Merge in one df with prefixes and common regular time index - df = merge_data_sources(data_sources) - + def generate_features(self, df:pd.DataFrame, all:bool=False) -> pd.DataFrame: # - # 2. # Generate all necessary derived features (NaNs are possible due to short history) # # We want to generate features only for the last rows (for performance reasons) @@ -338,13 +287,18 @@ def analyze(self): # Apply all feature generators to the data frame which get accordingly new derived columns # The feature parameters will be taken from App.config (depending on generator) + if all: + last_rows = 0 + for fs in feature_sets: df, _ = generate_feature_set(df, fs, last_rows=last_rows) df = df.iloc[-last_rows:] # For signal generation, ew will need only several last rows + + return df + def generate_score(self, df:pd.DataFrame) -> pd.DataFrame: # - # 3. # Apply ML models and generate score columns # @@ -384,12 +338,10 @@ def analyze(self): # This df contains only one (last) record df = df.join(score_df) - #df = pd.concat([predict_df, score_df], axis=1) - # - # 4. - # Aggregate and post-process - # + return df + + def aggregate_post_process(self, df:pd.DataFrame) -> pd.DataFrame: sa_sets = ['score_aggregation', 'score_aggregation_2'] for i, score_aggregation_set in enumerate(sa_sets): score_aggregation = App.config.get(score_aggregation_set) @@ -411,7 +363,68 @@ def analyze(self): aggregate_scores(df, score_aggregation, sell_column, sell_labels) # Mutually adjust two independent scores with opposite semantics combine_scores(df, score_aggregation, buy_column, sell_column) + return df + # + # Analysis (features, predictions, signals etc.) + # + + def analyze(self): + """ + 1. Convert klines to df + 2. Derive (compute) features (use same function as for model training) + 3. Derive (predict) labels by applying models trained for each label + 4. Generate buy/sell signals by applying rule models trained for best overall trade performance + """ + symbol = App.config["symbol"] + + last_kline_ts = self.get_last_kline_ts(symbol) + last_kline_ts_str = str(pd.to_datetime(last_kline_ts, unit='ms')) + + log.debug(f"Analyze {symbol}. Last kline timestamp: {last_kline_ts_str}") + + # + # 1. + # MERGE: Produce a single data frame with înput data from all sources + # + data_sources = App.config.get("data_sources", []) + if not data_sources: + data_sources = [{"folder": App.config["symbol"], "file": "klines", "column_prefix": ""}] + # Read data from online sources into data frames + for ds in data_sources: + if ds.get("file") == "klines": + try: + klines = self.klines.get(ds.get("folder")) + df = klines_to_df(klines) + + # Validate + source_columns = ['open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av'] + if df.isnull().any().any(): + null_columns = {k: v for k, v in df.isnull().any().to_dict().items() if v} + log.warning(f"Null in source data found. Columns with Null: {null_columns}") + # TODO: We might receive empty strings or 0s in numeric data - how can we detect them? + # TODO: Check that timestamps in 'close_time' are strictly consecutive + except Exception as e: + log.error(f"Error in klines_to_df method: {e}. Length klines: {len(klines)}") + return + else: + log.error("Unknown data sources. Currently only 'klines' is supported. Check 'data_sources' in config, key 'file'") + return + ds["df"] = df + + # Merge in one df with prefixes and common regular time index + df = merge_data_sources(data_sources) + + # 2 + df = self.generate_features(df) + + # 3. + df = self.generate_score(df) + #df = pd.concat([predict_df, score_df], axis=1) + + # 4. + df = self.aggregate_post_process(df) + # # 5. # Apply rule to last row