Multi-dimensional/Multi-input/Multi-output Data preprocessing and Batch Generators for Tensorflow models
You can find the library on PyPi keras_generators
pip install keras_generators
This library solves several basic problems in area of data preprocessing (scaling and encoding) and batch generation for Tensorflow models, for which there are no solution ( at the moment) in Tensorflow or other open-source libraries. There are several functionalities in opensource libraries which solves some of the below problems, but only partially, and there's no way to combine them into a single solution without performing some custom adaptation or extending their functionalities. The libraries are scikit-learn , TensorFlow and tensorflow-datasets and tensorflow-transform.
Imagine you have a timeseries data (like weather temperature, stock market prices) and you need to train a neural model to predict the next value in the sequence based on the sequential input. Basic operations you have to do:
You can
use TimeseriesGenerator
from Tensorflow which is able to ingest multi-variate timeseries data and produce batches of inputs and targets. It's
able to take time series parameters such as stride, length of history, etc., and produce batches for
training/validation. But, it's not able to generate multi-step target
data, and work on train/test splits. You'll need
to split the data manually by taking care of lookback and target data lengths, with all the possible edge cases of
stride and sampling parameters.
You'll have to generate target
data by yourself - there's no functionality in any of above library to extract it from
the ingested timeseries, and this might not be trivial, especially if you want to generate multi-step targets (like
predicting 3 data-points ahead).
-you'll have to do it sets by yourself - and this is not trivial as just splitiing by index won't work, as you have lookback to take care of, and you'll have to split the target data perfectly aligned with the input data. The tf.data.Dataset is able to do that by using window , skip and take and batch methods. It's able to align the targets ( if you'd have those somehow generated), but it's not able to perform re-shuffling of data at the end of the epoch, as well as scaling/normalizing of the data. Although there exists tft.scale_to_z_score, it doesn't let you save the coefficients and scale the data for the inference, thus rendering it unusable for production use.
- you'll have to perform encoding and scaling of the data by yourself, and take care of saving coefficients and parameters of encoding for the inference. There are StandardScaler and encoders which are able to perform de-normalizing of the target predictions for inference (inverse_transform), as well as save coefficients for normalizing the new input data for inference, but it will have to be saved separately for every input and output separately, i.e. if you have multi-input, or multi-output network, for every I/O layer you'll have to save the scalers separately and apply them before inference.
- you'll need to perform re-shuffling of the data at the end of the epoch by yourself, so you need to create custom batch generator class to be used by TF trainer.
Tensorflow models can have multiple inputs and multiple outputs. For example, a model can have 2 inputs (X1, X2) and 2 outputs (Y1, Y2). In this case, the data preprocessing and batch generation should be done in a way that the data is scaled and encoded for each input and output separately, as well as split into train/test keeping the input and target data aligned. This is not possible with the current Keras API or existing libraries.
- split the data train/test/val split using
TimeseriesDataSource.split
and splitters fromkeras_generators.splitters
module - perform automatic encoding/scaling using
keras_generators.encoders.DataEncoder
instances as parameters - decode/denormalize the predicted data
- generate multi-step target data using
TimeseriesTargetsParams
as parameter.
- aligned split of multi-input/multi-output data into train/test/val sets, and in the same time prform fit-encoding/scaling of train data and use the fit scalers and encoders for validation and test data.
keras_generators.generators.XBatchGenerator
and keras_generators.generators.XBatchGenerator
is able to:
- generate batches of data for inference (
XBatchGenerator
) and training (XYBatchGenerator
) for multi-input/multi-output models - perform re-shuffling of the data at the end of the epoch
All the above classes are used in a pipeline, and you can find an example of their usage in the example model here.
Generate multi-input/multi-step output Neural Network model.
Multiple Input: multi-variate timeseries + tabular data
Output: Multi-step timeseries target (predicting 3 data-points ahead), with stride 2, on second timeseries input(target_idx=1).
# input_df - input Dataframe with multi-variate timeseries data
price_input_ds = TimeseriesDataSource(name="input", tensors=input_df.values, length=60,
target_params=TimeseriesTargetsParams(delay=0, pred_len=3, stride=2, target_idx=1)
)
# Z-score scale data (input & output)
encoded_input_ds = price_input_ds.encode([ScaleEncoder(StandardScaler())])
# get targets with delay of 1 and prediction length of 3
targets_ds = TargetTimeseriesDataSource.from_timeseries_datasource(encoded_input_ds, name="target")
# tabular_df - Dataframe with tabular data. We scale data here with MinMax scaler
tabular_ds = TensorDataSource(name="tabular", tensors=tabular_df.values).encode([ScaleEncoder(MinMaxScaler())])
# Get train/val/test generators for Keras
dataset = DataSet(input_sources={encoded_input_ds.name: encoded_input_ds, "tabular": tabular_ds},
target_sources={targets_ds.name: targets_ds})
train_ds, val_ds, test_ds = dataset.split(splitter=OrderedSplitter(train=0.6, val=0.2))
train_gen = XYBatchGenerator(train_ds.input_sources, train_ds.target_sources, batch_size=32)
val_gen = XYBatchGenerator(val_ds.input_sources, val_ds.target_sources, batch_size=1024)
test_gen = XYBatchGenerator(test_ds.input_sources, test_ds.target_sources, batch_size=1024)
# Train model
history = model.fit(train_gen, epochs=20, validation_data=val_gen, )
# Inference and de-normalize/decode the predictions
y_pred = model.predict(test_gen.get_X_generator())
res_ds = TensorDataSource(name="prediction", tensors=y_pred, encoders=targets_ds.get_encoders())
unscaled_y_pred = res_ds.decode()[:]
All notable changes to this project will be documented in this file.
- explicit use of tf_keras in model_object.py
- removed serialization of the model in the callbacks
- Renames SerializableKerasObject -> SerializableCallback
- fix callbacks to work on TF 2.16
- added integration tests for callbacks.py module
- using dill serializer instead of pickle for serialization of the Callbacks
- Drops support <3.10 in setup.py
- Upgrade ModelParams to use pydantic>=2.*
- Proper serialization of Callbacks using cloudpickle + dependency on cloudpickle
- Added common SerializableKerasObject
- Use legacy Keras (2.*) instead of the new 3.0 by explicitly importing tf_keras lib
- Integration tests for Callback serialization
- Upgrade to support TensorFlow 2.16, Python 3.12.
- Drop support for Python versions < 3.10.
- Drop support for TensorFlow versions < 2.16.
- Drop support for pydantic 1.x.
- Added
state_autoclear
option toModelObject
as a workaround to memory leaks in Keras; it automatically callsK.clear_session()
once every N calls to predict/evaluate.
- Added
XYWBatchGenerator
to handle sample weights.
ModelParams
now inheritImmutableExtModel
from pyxtension.- Removed unused custom Models from common.
- Model directory name will now be prefixed with 'TS' instead of suffixed.
- Added "reverse" option to
OrderedSplitter
.
- Added
add_default_callbacks
parameter toModelObject.train()
.
- Now accepts empty validation and test in data split.
- Removed fixed protobuf dependency due to TensorFlow upgrade.
- Major improvements and bug fixes in
CompoundDataSource
.
- Added
callbacks.py
, various data encoders, and unchain functionality forCompoundDataSource
. - Added
predict_raw
,evaluate_raw
methods andMetrickCheckpoint
as default callback toSimpleModelObject
. - Code reformat with black.
- Added
ChainedDataEncoder
andCompoundDataEncoder
. - Fixed
CompoundDataSource
to use new encoders.
- Extended
TensorsDataSource.__getitem__
to accept numpy int indexing.
- Bugfix for generators.
- Added
CompoundDataSource
.
- Added
DataSet.get_encoders()
andsplit()
methods. - Improved typing annotations.
- Added usage example to README.
- Regression in
SimpleModelObject.from_model_dir()
. - Adjusted example to save and load trained model to/from disk.
- Added
TargetTimeseriesDataSource
, default layer names toModelParams
, andDataSource.select_features()
.
- TimeseriesDataSource.get_targets() now returns
TensorDataSource
. - Moved several classes to common.py.
- Updated README.md with the motivation of the library.
- Added model abstractions (ModelParams & ModelObject).
- Added examples with model training using keras-generators.
- Introduced data encoders, DataSource based generators, and adapted existing Splitters to new class architecture.
- Fixed imports for compatibility with breaking changes in TensorFlow 2.9.
- First commit with initial functionality.