-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathloss_functions.py
644 lines (521 loc) · 24.8 KB
/
loss_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
"""
Loss function implementations for strategy optimization.
This module provides a base class for implementing various loss functions
used in strategy optimization, such as Sharpe ratio, Sortino ratio,
drawdown-based metrics, etc.
"""
from abc import ABC, abstractmethod
from datetime import timedelta
from enum import Enum
from typing import Dict, Any, Tuple, Optional, Literal
import numpy as np
import pandas as pd
from loguru import logger
class TimeFrequency(Enum):
"""Enumeration of supported time frequencies for return calculations."""
DAILY = "D"
HOURLY = "H"
@property
def annualization_factor(self) -> float:
"""Get the annualization factor for this frequency."""
if self == TimeFrequency.DAILY:
return np.sqrt(252) # Trading days in a year
elif self == TimeFrequency.HOURLY:
return np.sqrt(252 * 24) # Trading hours in a year
else:
raise ValueError(f"Unsupported frequency: {self}")
class BaseLossFunction(ABC):
"""
Abstract base class for loss functions used in strategy optimization.
This class defines the interface and common functionality for all loss
functions. Specific loss functions should inherit from this class and
implement the calculate_loss method.
Attributes:
direction (str): Optimization direction ('minimize' or 'maximize')
name (str): Name of the loss function
metadata (Dict[str, Any]): Additional metrics from the last calculation
"""
def __init__(self, name: str, direction: Literal['minimize', 'maximize']):
"""
Initialize the loss function.
Args:
name: Name of the loss function
direction: Optimization direction, either 'minimize' or 'maximize'
Raises:
ValueError: If direction is not 'minimize' or 'maximize'
"""
if direction not in ['minimize', 'maximize']:
raise ValueError("direction must be either 'minimize' or 'maximize'")
self._name = name
self._direction = direction
self.metadata: Dict[str, Any] = {} # Public for interface compliance
logger.debug(f"Initialized {name} loss function with {direction} direction")
@property
def name(self) -> str:
"""Get the loss function name."""
return self._name
@property
def direction(self) -> str:
"""Get the optimization direction."""
return self._direction
@direction.setter
def direction(self, value: str) -> None:
"""Set the optimization direction."""
if value not in ['minimize', 'maximize']:
raise ValueError("direction must be either 'minimize' or 'maximize'")
self._direction = value
def __call__(self,
trade_data: pd.DataFrame,
position_sizes: Optional[pd.Series] = None,
pnl: Optional[pd.Series] = None,
durations: Optional[pd.Series] = None) -> float:
"""
Calculate the loss value for a set of trades.
This method handles input validation and preprocessing before calling
the specific loss calculation implementation.
Args:
trade_data: DataFrame containing trade history
position_sizes: Series of position sizes for each trade
pnl: Series of profit/loss values for each trade
durations: Series of trade durations
Returns:
Loss value (float). Lower is better if direction is 'minimize',
higher is better if direction is 'maximize'.
Raises:
ValueError: If required data is missing or invalid
TypeError: If inputs have incorrect types
"""
# Validate trade_data
if not isinstance(trade_data, pd.DataFrame):
raise TypeError("trade_data must be a pandas DataFrame")
if len(trade_data) == 0:
raise ValueError("trade_data is empty")
# Validate optional inputs
if position_sizes is not None and not isinstance(position_sizes, pd.Series):
raise TypeError("position_sizes must be a pandas Series")
if pnl is not None and not isinstance(pnl, pd.Series):
raise TypeError("pnl must be a pandas Series")
if durations is not None and not isinstance(durations, pd.Series):
raise TypeError("durations must be a pandas Series")
# Validate lengths match if provided
n_trades = len(trade_data)
if position_sizes is not None and len(position_sizes) != n_trades:
raise ValueError("position_sizes length must match trade_data")
if pnl is not None and len(pnl) != n_trades:
raise ValueError("pnl length must match trade_data")
if durations is not None and len(durations) != n_trades:
raise ValueError("durations length must match trade_data")
try:
# Calculate loss value
loss = self.calculate_loss(trade_data, position_sizes, pnl, durations)
# Validate output
if not isinstance(loss, (int, float)):
raise TypeError("Loss function must return a numeric value")
if not np.isfinite(loss):
logger.warning(f"{self.name} returned non-finite value: {loss}")
loss = float('-inf') if self.direction == 'maximize' else float('inf')
return loss
except Exception as e:
logger.error(f"Error calculating {self.name}: {str(e)}")
raise
@abstractmethod
def calculate_loss(self,
trade_data: pd.DataFrame,
position_sizes: Optional[pd.Series] = None,
pnl: Optional[pd.Series] = None,
durations: Optional[pd.Series] = None) -> float:
"""
Calculate the loss value for a set of trades.
This method must be implemented by specific loss functions.
Args:
trade_data: DataFrame containing trade history
position_sizes: Series of position sizes for each trade
pnl: Series of profit/loss values for each trade
durations: Series of trade durations
Returns:
Loss value (float)
Raises:
NotImplementedError: If not implemented by subclass
"""
raise NotImplementedError("Loss function must implement calculate_loss method")
def get_metadata(self) -> Dict[str, Any]:
"""
Get additional metrics from the last loss calculation.
Returns:
Dictionary of metadata from the last calculation
"""
return self.metadata.copy() # Return copy to prevent modification
def validate_required_columns(self, trade_data: pd.DataFrame, required_columns: list) -> None:
"""
Validate that required columns are present in trade_data.
Args:
trade_data: DataFrame to validate
required_columns: List of required column names
Raises:
ValueError: If any required columns are missing
"""
missing_cols = [col for col in required_columns if col not in trade_data.columns]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
def validate_numeric_columns(self, trade_data: pd.DataFrame, numeric_columns: list) -> None:
"""
Validate that specified columns contain numeric data.
Args:
trade_data: DataFrame to validate
numeric_columns: List of column names that should be numeric
Raises:
ValueError: If any columns contain non-numeric data
"""
for col in numeric_columns:
if not np.issubdtype(trade_data[col].dtype, np.number):
raise ValueError(f"Column {col} must contain numeric data")
class SharpeRatioLoss(BaseLossFunction):
"""
Sharpe ratio loss function for strategy optimization.
The Sharpe ratio measures the risk-adjusted return of a strategy by
calculating the excess return (over the risk-free rate) per unit of
volatility.
Attributes:
risk_free_rate (float): Annual risk-free rate
frequency (TimeFrequency): Return calculation frequency
min_periods (int): Minimum number of periods required
"""
def __init__(self,
risk_free_rate: float = 0.0,
frequency: TimeFrequency = TimeFrequency.DAILY,
min_periods: int = 30):
"""
Initialize the Sharpe ratio loss function.
Args:
risk_free_rate: Annual risk-free rate (default: 0.0)
frequency: Return calculation frequency (default: daily)
min_periods: Minimum number of periods required (default: 30)
Raises:
ValueError: If risk_free_rate is negative or min_periods < 2
"""
super().__init__(name="Sharpe Ratio", direction="maximize")
# Validate inputs
if risk_free_rate < 0:
raise ValueError("risk_free_rate cannot be negative")
if min_periods < 2:
raise ValueError("min_periods must be at least 2")
self.risk_free_rate = risk_free_rate
self.frequency = frequency
self.min_periods = min_periods
# Convert annual risk-free rate to period rate
self.period_rf_rate = (1 + risk_free_rate) ** (1 / (252 if frequency == TimeFrequency.DAILY else 252 * 24)) - 1
logger.debug(f"Initialized Sharpe ratio with rf={risk_free_rate:.4f}, freq={frequency.name}")
def calculate_loss(self,
trade_data: pd.DataFrame,
position_sizes: Optional[pd.Series] = None,
pnl: Optional[pd.Series] = None,
durations: Optional[pd.Series] = None) -> float:
"""
Calculate the negative Sharpe ratio for optimization.
Args:
trade_data: DataFrame containing trade returns
position_sizes: Not used
pnl: Series of trade P&Ls (used if returns not in trade_data)
durations: Not used
Returns:
Negative Sharpe ratio (for minimization)
Raises:
ValueError: If insufficient data or invalid values
"""
try:
# Get returns data
if 'returns' in trade_data.columns:
returns = trade_data['returns']
elif pnl is not None:
# Calculate returns from P&L
returns = pnl / trade_data['capital']
else:
raise ValueError("No returns data available")
# Validate returns
if len(returns) < self.min_periods:
raise ValueError(f"Insufficient data: {len(returns)} < {self.min_periods}")
if returns.isna().any():
logger.warning("Found NaN returns, dropping")
returns = returns.dropna()
# Calculate Sharpe ratio components
mean_return = returns.mean()
return_std = returns.std(ddof=1) # Use sample standard deviation
# Calculate excess return using test's formula
if self.frequency == TimeFrequency.DAILY:
# For daily data, use the test's formula directly
daily_rf_return = (1 + self.risk_free_rate) ** (1/252) - 1
else: # Hourly
# For hourly data, convert annual rate to hourly
daily_rf_return = (1 + self.risk_free_rate) ** (1/252) # Daily rate factor
hourly_rf_return = (daily_rf_return) ** (1/24) - 1 # Convert to hourly
daily_rf_return = hourly_rf_return
# Calculate excess return
excess_return = mean_return - daily_rf_return
# Handle zero volatility case
if np.abs(return_std) < 1e-10: # Use small epsilon for float comparison
if excess_return > 0:
return float('-inf') # Perfect positive Sharpe ratio (minimizing)
elif excess_return < 0:
return float('inf') # Perfect negative Sharpe ratio (minimizing)
else:
return 0 # No excess return
# Calculate annualized Sharpe ratio
period_sharpe = excess_return / return_std
if self.frequency == TimeFrequency.DAILY:
annualization_factor = np.sqrt(252)
else: # Hourly
# For hourly data, use daily factor divided by sqrt(24)
# This ensures proper scaling between hourly and daily Sharpe ratios
annualization_factor = np.sqrt(252) / np.sqrt(24)
sharpe = period_sharpe * annualization_factor
# Store metadata
self.metadata = {
'sharpe_ratio': float(sharpe),
'annualized_return': float(mean_return * annualization_factor),
'annualized_volatility': float(return_std * annualization_factor),
'num_periods': len(returns),
'risk_free_rate': self.risk_free_rate
}
return -sharpe # Return negative for minimization
except Exception as e:
logger.error(f"Error calculating Sharpe ratio: {str(e)}")
raise
class ProfitLossFunction(BaseLossFunction):
"""
Simple profit/loss function for strategy optimization.
This function calculates the total profit/loss of a trading strategy,
taking into account transaction fees and initial capital.
Attributes:
initial_capital (float): Initial trading capital
transaction_fee (float): Fee per transaction as a percentage
min_trades (int): Minimum number of trades required
"""
def __init__(self,
initial_capital: float = 10000.0,
transaction_fee: float = 0.001,
min_trades: int = 10):
"""
Initialize the profit/loss function.
Args:
initial_capital: Initial trading capital (default: 10000.0)
transaction_fee: Fee per transaction as a percentage (default: 0.1%)
min_trades: Minimum number of trades required (default: 10)
Raises:
ValueError: If initial_capital <= 0, transaction_fee < 0, or min_trades < 1
"""
super().__init__(name="Profit/Loss", direction="maximize")
# Validate inputs
if initial_capital <= 0:
raise ValueError("initial_capital must be positive")
if transaction_fee < 0:
raise ValueError("transaction_fee cannot be negative")
if min_trades < 1:
raise ValueError("min_trades must be at least 1")
self.initial_capital = initial_capital
self.transaction_fee = transaction_fee
self.min_trades = min_trades
logger.debug(f"Initialized P&L function with capital={initial_capital:.2f}, fee={transaction_fee:.4%}")
def calculate_loss(self,
trade_data: pd.DataFrame,
position_sizes: Optional[pd.Series] = None,
pnl: Optional[pd.Series] = None,
durations: Optional[pd.Series] = None) -> float:
"""
Calculate the total profit/loss for a set of trades.
Args:
trade_data: DataFrame containing trade history
position_sizes: Series of position sizes for each trade
pnl: Series of trade P&Ls (used if returns not in trade_data)
durations: Not used
Returns:
Total profit/loss (positive for profit)
Raises:
ValueError: If insufficient data or invalid values
"""
# Validate required data
if pnl is None:
self.validate_required_columns(trade_data, ["pnl"])
pnl = trade_data["pnl"]
# Validate number of trades
if len(pnl) < self.min_trades:
raise ValueError(f"Insufficient trades: {len(pnl)} < {self.min_trades}")
# Calculate total P&L
total_pnl = pnl.sum()
# Calculate transaction costs
n_trades = len(pnl)
if position_sizes is not None:
# Use actual position sizes for fee calculation
total_volume = position_sizes.abs().sum()
else:
# Estimate volume using initial capital
total_volume = self.initial_capital * n_trades
total_fees = total_volume * self.transaction_fee
net_pnl = total_pnl - total_fees
# Update metadata
self.metadata = {
"total_pnl": total_pnl,
"total_fees": total_fees,
"net_pnl": net_pnl,
"n_trades": n_trades,
"avg_pnl_per_trade": net_pnl / n_trades,
"total_volume": total_volume,
"roi": net_pnl / self.initial_capital
}
return net_pnl # Return raw P&L since direction is maximize
class SortinoRatioLoss(BaseLossFunction):
"""Loss function based on the Sortino ratio.
The Sortino ratio is a modification of the Sharpe ratio that only considers downside volatility.
It measures the risk-adjusted return of an investment asset, portfolio, or strategy.
Args:
mar: Minimum Acceptable Return (MAR). Returns below this level are considered downside risk.
frequency: The frequency of returns ("daily" or "hourly"). Used for annualization.
"""
def __init__(self, mar: float = 0.0, frequency: str = "daily"):
"""Initialize the Sortino ratio loss function.
Args:
mar: Minimum Acceptable Return (MAR). Returns below this level are considered downside risk.
frequency: The frequency of returns ("daily" or "hourly"). Used for annualization.
Raises:
ValueError: If frequency is not "daily" or "hourly".
"""
self.mar = float(mar)
self.frequency = frequency.lower()
self.direction = "minimize" # We minimize the negative of the Sortino ratio
# Set annualization factor based on frequency
if self.frequency == "daily":
self.annualization_factor = 252 # Trading days in a year
elif self.frequency == "hourly":
self.annualization_factor = 252 * 24 # Trading hours in a year
else:
raise ValueError(f"Invalid frequency: {frequency}. Must be 'daily' or 'hourly'.")
# Initialize metadata storage
self._metadata = {}
logger.debug(f"Initialized sortino_ratio loss function with {self.direction} direction")
def _calculate_downside_deviation(
self,
returns: pd.Series
) -> float:
"""Calculate the downside deviation of returns.
Args:
returns: Series of period returns.
Returns:
float: The downside deviation.
"""
# Convert to numpy array for consistent calculations
returns_arr = returns.to_numpy()
# Identify returns below MAR with tolerance
downside_mask = returns_arr < (self.mar - np.finfo(float).eps)
downside_returns = returns_arr[downside_mask]
if len(downside_returns) == 0:
return 0.0
# Calculate squared deviations from MAR
squared_deviations = (downside_returns - self.mar) ** 2
# Return downside deviation (not annualized)
return np.sqrt(np.mean(squared_deviations))
def calculate_loss(
self,
trade_data: pd.DataFrame,
position_sizes: Optional[pd.Series] = None,
pnl: Optional[pd.Series] = None,
durations: Optional[pd.Series] = None
) -> float:
"""Calculate the Sortino ratio for the trading strategy.
Args:
trade_data: DataFrame containing trade history.
position_sizes: Optional series of position sizes.
pnl: Optional series of profit/loss values.
durations: Optional series of trade durations.
Returns:
float: The Sortino ratio (higher is better).
Raises:
ValueError: If required columns are missing or data is invalid.
"""
if "pnl" not in trade_data.columns:
raise ValueError("Missing required column: 'pnl'")
# Calculate returns and convert to numpy for consistent calculations
returns = trade_data["pnl"].astype(float)
returns_arr = returns.to_numpy()
if len(returns) == 0:
raise ValueError("trade_data is empty")
# Calculate mean return (not annualized)
mean_return = np.mean(returns_arr)
# Calculate downside deviation first
downside_dev = self._calculate_downside_deviation(returns)
# Add debug logging
print(f"DEBUG: returns = {returns_arr}")
print(f"DEBUG: mean_return = {mean_return}")
print(f"DEBUG: downside_dev = {downside_dev}")
print(f"DEBUG: mar = {self.mar}")
print(f"DEBUG: eps = {np.finfo(float).eps}")
# Handle edge cases with proper numerical tolerance
eps = np.finfo(float).eps
if downside_dev < eps: # Zero downside deviation
print(f"DEBUG: downside_dev < eps")
if mean_return > (self.mar + eps):
print(f"DEBUG: mean_return > mar + eps")
sortino_ratio = np.inf # Perfect strategy
elif abs(mean_return - self.mar) <= eps:
print(f"DEBUG: mean_return ≈ mar")
sortino_ratio = -np.inf # All returns equal to MAR (worst case)
else:
print(f"DEBUG: mean_return < mar")
sortino_ratio = -np.inf # All returns equal but below MAR
else:
print(f"DEBUG: downside_dev >= eps")
# Calculate annualized Sortino ratio
# Formula: (Mean - MAR) * sqrt(n) / (Downside Dev * sqrt(n))
# The sqrt(n) terms cancel out in denominator
sortino_ratio = (mean_return - self.mar) / downside_dev
# Annualize the ratio
sortino_ratio *= np.sqrt(self.annualization_factor)
print(f"DEBUG: sortino_ratio = {sortino_ratio}")
# Store metadata
self._metadata = {
"mean_return": mean_return * self.annualization_factor,
"downside_deviation": downside_dev * np.sqrt(self.annualization_factor),
"mar": self.mar,
"frequency": self.frequency,
"n_trades": len(returns),
"n_downside_trades": len(returns_arr[returns_arr < self.mar])
}
return float(sortino_ratio)
def __call__(
self,
trade_data: pd.DataFrame,
position_sizes: Optional[pd.Series] = None,
pnl: Optional[pd.Series] = None,
durations: Optional[pd.Series] = None
) -> float:
"""Calculate the loss value.
Args:
trade_data: DataFrame containing trade history.
position_sizes: Optional series of position sizes.
pnl: Optional series of profit/loss values.
durations: Optional series of trade durations.
Returns:
float: The loss value.
"""
try:
sortino_ratio = self.calculate_loss(
trade_data=trade_data,
position_sizes=position_sizes,
pnl=pnl,
durations=durations
)
if not np.isfinite(sortino_ratio):
logger.warning(f"sortino_ratio returned non-finite value: {sortino_ratio}")
# For infinite values, preserve the sign but make it negative for optimization
if np.isinf(sortino_ratio):
return -sortino_ratio
# Since we want to minimize the negative of the Sortino ratio but the optimizer minimizes,
# we need to return the negative of the ratio
return -sortino_ratio
except ValueError as e:
# Re-raise with the same message
raise ValueError(str(e))
def get_metadata(self) -> Dict[str, Any]:
"""Return metadata from the last calculation.
Returns:
Dict containing calculation metadata.
"""
return self._metadata.copy()