Skip to content

Commit

Permalink
Merge pull request #23 from RWTH-EBC/22-add-data-conversion-funtion-t…
Browse files Browse the repository at this point in the history
…o-data-logger

22 add data conversion funtion to data logger
  • Loading branch information
Jun-Jiang-92 authored Oct 29, 2024
2 parents 06b8880 + 99a69d0 commit 83647f6
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 36 deletions.
35 changes: 25 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,37 @@

Data logging is the process of acquiring data over time from various sources, typically using sensors or instruments,
and storing them in one or multiple outputs, such as files or databases.
This Python package provides easy understandable interfaces for various data sources and outputs, facilitating a quick
This Python package provides easily understandable interfaces for various data sources and outputs, facilitating a quick
and easy configuration for data logging and data transfer.

Potential use cases include field measurements, test bench monitoring, and Hardware-in-the-Loop (HiL) development.
With its versatile capabilities, this toolbox aims to enhance the efficiency of data acquisition processes across
different applications.

## Data logger

As the key component in the data logging process, the data logger in this toolbox ensures high flexibility in the
logging procedure, featuring the following capabilities:

- Read and write data from and to multiple systems simultaneously
- Rename each variable in data sources for each output individually
- Automatically prefix variable names to avoid duplicates in data outputs
- Perform data type conversion for each variable in data sources for each data output individually

The following types of data loggers are available in the toolbox:

- Periodic trigger (time trigger)
- MQTT on-message trigger

## Currently supported systems

The toolbox currently supports the following platforms and protocols:
The toolbox currently supports the following platforms and protocols, as shown in the table:

| System | Read from system<br>(data source) | Write to system<br>(data output) | Note |
|:-------------------------------------------------------:|:---------------------------------:|:--------------------------------:|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [Beckhoff PLC](https://www.beckhoff.com/) | Yes | Yes | - |
| [ICP DAS](https://www.icpdas.com/) | Yes | Yes (not tested) | Currently, the package only supports the [DCON Based I/O Expansion Unit](https://www.icpdas.com/en/product/guide+Remote__I_O__Module__and__Unit+Ethernet__I_O__Modules+IO__Expansion__Unit) with the I-87K series. |
| [MQTT protocol](https://mqtt.org/) | Yes | Yes | - |
| [The Things Network](https://www.thethingsnetwork.org/) | Yes | Yes (not tested) | Communication is via MQTT Server supported by The Things Stack. |
| [Sensor Electronic](http://sensor-electronic.pl/) | Yes | No | The package supports the Air Distribution Measuring System ([AirDistSys 5000](http://sensor-electronic.pl/pdf/KAT_AirDistSys5000.pdf)) and the Thermal Condition Monitoring System ([ThermCondSys 5500](http://sensor-electronic.pl/pdf/KAT_ThermCondSys5500.pdf)). Device configuration is possible, but it is not directly accessible via the data source API. |

- [Beckhoff PLC](https://www.beckhoff.com/)
- [ICP DAS](https://www.icpdas.com/) (Currently, the package only supports the
[DCON Based I/O Expansion Unit](https://www.icpdas.com/en/product/guide+Remote__I_O__Module__and__Unit+Ethernet__I_O__Modules+IO__Expansion__Unit)
with the I-87K series.)
- [MQTT protocol](https://mqtt.org/)
- [Sensor Electronic](http://sensor-electronic.pl/), which includes the air distribution measuring system
[AirDistSys 5000](http://sensor-electronic.pl/pdf/KAT_AirDistSys5000.pdf), and thermal condition monitoring system
[ThermCondSys 5500](http://sensor-electronic.pl/pdf/KAT_ThermCondSys5500.pdf)
142 changes: 127 additions & 15 deletions ebcmeasurements/Base/DataLogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@


class DataLoggerBase(ABC):
# Class attribute: supported types by data type conversions
_types_of_data_type_conversion = ('str', 'int', 'float', 'bool', 'bytes')

def __init__(
self,
data_sources_mapping: dict[str, DataSource.DataSourceBase | DataSourceOutput.DataSourceOutputBase],
data_outputs_mapping: dict[str, DataOutput.DataOutputBase | DataSourceOutput.DataSourceOutputBase],
data_type_conversion_mapping: dict[str, dict[str, dict[str, str]]] | None = None,
data_rename_mapping: dict[str, dict[str, dict[str, str]]] | None = None,
**kwargs
):
Expand All @@ -35,6 +39,31 @@ def __init__(
...
}
The format of data_type_conversion_mapping is as follows:
{
'<source1_name>': {
<'output1_name'>: {
<variable_name_in_source1>: <type_to_be_converted>,
...
},
<'output2_name'>: {
<variable_name_in_source1>: <type_to_be_converted>,
...
},
},
'<source2_name>': {
<'output1_name'>: {
<variable_name_in_source2>: <type_to_be_converted>,
...
},
<'output2_name'>: {
<variable_name_in_source2>: <type_to_be_converted>,
...
},
},
...
}
The format of data_rename_mapping is as follows:
{
'<source1_name>': {
Expand Down Expand Up @@ -62,6 +91,8 @@ def __init__(
:param data_sources_mapping: Mapping of multiple data sources
:param data_outputs_mapping: Mapping of multiple data outputs
:param data_type_conversion_mapping: Mapping of multiple data type conversions, None to use default data types
provided by data sources, supported types are 'str', 'int', 'float', 'bool', 'bytes'
:param data_rename_mapping: Mapping of rename for data sources and data outputs, None to use default names
provided by data sources
:param kwargs:
Expand All @@ -81,14 +112,24 @@ def __init__(
for k, do in data_outputs_mapping.items()
}

# Data type conversion mapping of data sources and outputs
if data_type_conversion_mapping is not None:
# Check data type conversion mapping of data sources and outputs
self._check_data_type_conversion_mapping_input(data_type_conversion_mapping=data_type_conversion_mapping)
# Init the data type conversion mapping (full mapping)
self._data_type_conversion_mapping = self._init_data_type_conversion_mapping(
data_type_conversion_mapping=data_type_conversion_mapping)
else:
self._data_type_conversion_mapping = None

# Check rename mapping of data sources and outputs
if data_rename_mapping is not None:
self._check_data_rename_mapping_input(
data_rename_mapping=data_rename_mapping,
explicit=kwargs.get('data_rename_mapping_explicit', False)
)

# Init the data rename mapping
# Init the data rename mapping (full mapping)
self._data_rename_mapping = self._init_data_rename_mapping(
data_rename_mapping=data_rename_mapping if data_rename_mapping is not None else {},
)
Expand Down Expand Up @@ -148,18 +189,45 @@ def __init__(
# Count of logging
self.log_count = 0

def _check_data_rename_mapping_input(self, data_rename_mapping: dict, explicit: bool):
"""Check input dict of data rename mapping"""
def _check_data_source_name(data_source_name):
"""Check if data source name available in data sources"""
if data_source_name not in self._data_sources_mapping.keys():
raise ValueError(f"Invalid data source name '{data_source_name}' for rename mapping")
def _check_data_source_name(self, data_source_name: str):
"""Check if data source name available in data sources"""
if data_source_name not in self._data_sources_mapping.keys():
raise ValueError(f"Invalid data source name '{data_source_name}' for rename mapping")

def _check_data_output_name(self, data_output_name: str):
"""Check if data output name available in data outputs"""
if data_output_name not in self._data_outputs_mapping.keys():
raise ValueError(f"Invalid data output name '{data_output_name}' for rename mapping")

def _check_data_output_name(data_output_name):
"""Check if data output name available in data outputs"""
if data_output_name not in self._data_outputs_mapping.keys():
raise ValueError(f"Invalid data output name '{data_output_name}' for rename mapping")
def _check_data_type_conversion_mapping_input(self, data_type_conversion_mapping: dict):
"""Check input dict of data type conversion mapping"""
# Check data source, data output, and mapping
for ds_name, output_dict in data_type_conversion_mapping.items():
self._check_data_source_name(ds_name)
for do_name, mapping in output_dict.items():
self._check_data_output_name(do_name)
for typ in mapping.values():
if typ not in self._types_of_data_type_conversion:
raise ValueError(f"Invalid data type '{typ}' for data type conversion mapping, it must be one "
f"of '{self._types_of_data_type_conversion}'")

def _init_data_type_conversion_mapping(
self, data_type_conversion_mapping: dict) -> dict[str, dict[str, dict[str, str | None]]]:
"""Init data type conversion mapping for all data sources to all data outputs, if the conversion mapping for a
variable name is unavailable, return None"""
return {
ds_name: {
do_name: {
var: data_type_conversion_mapping.get(ds_name, {}).get(do_name, {}).get(var, None)
for var in ds.all_variable_names
}
for do_name in self._data_outputs_mapping.keys()
}
for ds_name, ds in self._data_sources_mapping.items()
}

def _check_data_rename_mapping_input(self, data_rename_mapping: dict, explicit: bool):
"""Check input dict of data rename mapping"""
def _explicit_check_rename_mapping(data_source_name, rename_mapping):
"""Explicit check if all keys in the rename mapping are available in data source"""
for key in rename_mapping.keys():
Expand All @@ -171,9 +239,9 @@ def _explicit_check_rename_mapping(data_source_name, rename_mapping):

# Check data source, data output, and mapping
for ds_name, output_dict in data_rename_mapping.items():
_check_data_source_name(ds_name)
self._check_data_source_name(ds_name)
for do_name, mapping in output_dict.items():
_check_data_output_name(do_name)
self._check_data_output_name(do_name)
if explicit:
_explicit_check_rename_mapping(ds_name, mapping)

Expand Down Expand Up @@ -238,11 +306,25 @@ def read_data_all_sources(self) -> dict[str, dict]:
}

def log_data_all_outputs(self, data: dict[str, dict], timestamp: str = None):
def process_variable_name(data_source_name: str, data_output_name: str, variable_name: str) -> str:
# Rename the variable based on rename mapping
return self._data_rename_mapping[data_source_name][data_output_name][variable_name]

def process_variable_value(data_source_name: str, data_output_name: str, variable_name: str, value):
if self._data_type_conversion_mapping is None:
# No data type conversion
return value
else:
return self.convert_data_type(
value=value,
data_type=self._data_type_conversion_mapping[data_source_name][data_output_name][variable_name]
)

"""Log data to all data outputs"""
for do_name, do in self._data_outputs_mapping.items():
# Unzip and rename key for the current output
unzipped_data = {
self._data_rename_mapping[ds_name][do_name][var]: value
process_variable_name(ds_name, do_name, var): process_variable_value(ds_name, do_name, var, value)
for ds_name, ds_data in data.items()
for var, value in ds_data.items()
}
Expand Down Expand Up @@ -276,18 +358,48 @@ def get_timestamp_now() -> str:
"""Get the timestamp by now"""
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())

@staticmethod
def convert_data_type(value, data_type: str | None) -> bool | str | int | float | bytes | None:
"""Convert a single value to defined type"""
if value is None:
return None

if data_type is None:
return value

try:
if data_type == 'str':
return str(value)
elif data_type == 'int':
return int(value)
elif data_type == 'float':
return float(value)
elif data_type == 'bool':
# Converts any non-zero or non-empty string to True, otherwise False
return bool(value) and value not in (0, '', None)
elif data_type == 'bytes':
# Convert to bytes using UTF-8 encoding by default
return bytes(str(value), 'utf-8')
else:
raise ValueError(f"Unsupported data type '{data_type}'")
except ValueError:
logger.warning(f"Could not convert value '{value}' to type '{data_type}'")
return value


class DataLoggerTimeTrigger(DataLoggerBase):
def __init__(
self,
data_sources_mapping: dict[str, DataSource.DataSourceBase | DataSourceOutput.DataSourceOutputBase],
data_outputs_mapping: dict[str, DataOutput.DataOutputBase | DataSourceOutput.DataSourceOutputBase],
data_type_conversion_mapping: dict[str, dict[str, dict[str, str]]] | None = None,
data_rename_mapping: dict[str, dict[str, dict[str, str]]] | None = None,
**kwargs
):
"""Time triggerd data logger"""
logger.info("Initializing DataLoggerTimeTrigger ...")
super().__init__(data_sources_mapping, data_outputs_mapping, data_rename_mapping, **kwargs)
super().__init__(
data_sources_mapping, data_outputs_mapping, data_type_conversion_mapping, data_rename_mapping, **kwargs)

def run_data_logging(self, interval: int | float, duration: int | float | None):
"""
Expand Down
6 changes: 6 additions & 0 deletions ebcmeasurements/Base/DataSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__(self, size: int = 10, key_missing_rate: float = 0.5, value_missing_
:param size: Number of variables to generate
:param key_missing_rate: Probability of a key being excluded from the final dictionary
:param value_missing_rate: Probability of assigning None to a value instead of a random float
Default variable names are formatted as 'RandData<n>'.
"""
super().__init__()
if not (0.0 <= key_missing_rate <= 1.0):
Expand Down Expand Up @@ -73,6 +75,8 @@ def __init__(
:param str_length: Length of each random string
:param key_missing_rate: Probability of a key being excluded from the final dictionary
:param value_missing_rate: Probability of assigning None to a value instead of a random float
Default variable names are formatted as 'RandStr<n>'.
"""
super().__init__(size, key_missing_rate, value_missing_rate)
self.str_length = str_length
Expand All @@ -99,6 +103,8 @@ def __init__(
:param size: Number of variables to generate
:param key_missing_rate: Probability of a key being excluded from the final dictionary
:param value_missing_rate: Probability of assigning None to a value instead of a random float
Default variable names are formatted as 'RandBool<n>'.
"""
super().__init__(size, key_missing_rate, value_missing_rate)
self._all_variable_names = tuple(f'RandBool{n}' for n in range(self.size)) # Re-define all data names
Expand Down
Loading

0 comments on commit 83647f6

Please sign in to comment.