-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
70 lines (58 loc) · 3.32 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow.python.keras as keras
from keras.callbacks import ModelCheckpoint
def custom_mae_loss(y_true, y_pred):
y_true_next = tf.cast(y_true[:, 1], tf.float64) # Extract the true next values, scaled
y_pred_next = tf.cast(y_pred[:, 0], tf.float64) # Extract the predicted next values, scaled
abs_error = tf.abs(y_true_next - y_pred_next) # Calculate the absolute error
return tf.reduce_mean(abs_error) # Return the mean of these errors
def dir_acc(y_true, y_pred):
mean, std = tf.cast(y_true[:, 2], tf.float64), tf.cast(y_true[:, 3], tf.float64) # Retrieve scaling factors
y_true_prev = (tf.cast(y_true[:, 0], tf.float64) * std) + mean # Un-scale previous true price
y_true_next = (tf.cast(y_true[:, 1], tf.float64) * std) + mean # Un-scale next true price
y_pred_next = (tf.cast(y_pred[:, 0], tf.float64) * std) + mean # Un-scale predicted next price
true_change = y_true_next - y_true_prev # Calculate true change
pred_change = y_pred_next - y_true_prev # Calculate predicted change
correct_direction = tf.equal(tf.sign(true_change), tf.sign(pred_change)) # Check if the signs match
return tf.reduce_mean(tf.cast(correct_direction, tf.float64)) # Return the mean of correct directions
# Define a callback to save the best model
checkpoint_callback_train = ModelCheckpoint(
"transformer_train_model.keras", # Filepath to save the best model
monitor="dir_acc", #"loss", # Metric to monitor
save_best_only=True, # Save only the best model
mode="max", # Minimize the monitored metric
verbose=1, # Display progress
)
# Define a callback to save the best model
checkpoint_callback_val = ModelCheckpoint(
"transformer_val_model.keras", # Filepath to save the best model
monitor="val_dir_acc", #"val_loss", # Metric to monitor
save_best_only=True, # Save only the best model
mode="max", # Minimize the monitored metric
verbose=1, # Display progress
)
def get_lr_callback(batch_size=16, mode='cos', epochs=500, plot=False):
lr_start, lr_max, lr_min = 0.0001, 0.005, 0.00001 # Adjust learning rate boundaries
lr_ramp_ep = int(0.30 * epochs) # 30% of epochs for warm-up
lr_sus_ep = max(0, int(0.10 * epochs) - lr_ramp_ep) # Optional sustain phase, adjust as needed
def lrfn(epoch):
if epoch < lr_ramp_ep: # Warm-up phase
lr = (lr_max - lr_start) / lr_ramp_ep * epoch + lr_start
elif epoch < lr_ramp_ep + lr_sus_ep: # Sustain phase at max learning rate
lr = lr_max
elif mode == 'cos':
decay_total_epochs, decay_epoch_index = epochs - lr_ramp_ep - lr_sus_ep, epoch - lr_ramp_ep - lr_sus_ep
phase = math.pi * decay_epoch_index / decay_total_epochs
lr = (lr_max - lr_min) * 0.5 * (1 + math.cos(phase)) + lr_min
else:
lr = lr_min # Default to minimum learning rate if mode is not recognized
return lr
if plot: # Plot learning rate curve if plot is True
plt.figure(figsize=(10, 5))
plt.plot(np.arange(epochs), [lrfn(epoch) for epoch in np.arange(epochs)], marker='o')
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.title('Learning Rate Scheduler')
plt.show()
return tf.keras.callbacks.LearningRateScheduler(lrfn, verbose=True)