From b0d2ae76aff5264c99295a4beb6bd10b9667d0f1 Mon Sep 17 00:00:00 2001 From: MeenBna Date: Thu, 21 Mar 2024 15:46:18 +0100 Subject: [PATCH] Enhanced event handling for OPC UA server APIs interactions. --- ...ploring_API_Functions_Authentication.ipynb | 30 +- .../Test_API_Functions_New_Features.ipynb | 674 ------------------ src/pyprediktormapclient/opc_ua.py | 160 +++++ 3 files changed, 189 insertions(+), 675 deletions(-) delete mode 100644 notebooks/Test_API_Functions_New_Features.ipynb diff --git a/notebooks/Exploring_API_Functions_Authentication.ipynb b/notebooks/Exploring_API_Functions_Authentication.ipynb index ad1693e..071a7a2 100644 --- a/notebooks/Exploring_API_Functions_Authentication.ipynb +++ b/notebooks/Exploring_API_Functions_Authentication.ipynb @@ -237,6 +237,34 @@ "opc_data = OPC_UA(rest_url=opcua_rest_url, opcua_url=opcua_server_url, namespaces=namespace_list, auth_client=auth_client)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Getting all event types\n", + "event_types = opc_data.get_event_types()\n", + "event_types" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Reading one month historical events of trackers \n", + "hist_events = opc_data.read_historical_events(\n", + " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", + " end_time=(datetime.datetime.now() - datetime.timedelta(1)),\n", + " variable_list=trackers.variables_as_list([\"AngleMeasured\", \"AngleSetpoint\"], True),\n", + " fields_list=[\"Time\", \"Message\", \"SourceName\", \"Severity\", \"Active\", \"Acknowledged\", \"Comment\", \"EventTypeName\"],\n", + " event_type_name=\"InverterEventType\",\n", + ")\n", + "hist_events" + ] + }, { "cell_type": "code", "execution_count": null, @@ -284,7 +312,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.8" + "version": "3.12.2" } }, "nbformat": 4, diff --git a/notebooks/Test_API_Functions_New_Features.ipynb b/notebooks/Test_API_Functions_New_Features.ipynb deleted file mode 100644 index 5dfc61d..0000000 --- a/notebooks/Test_API_Functions_New_Features.ipynb +++ /dev/null @@ -1,674 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Import Libraries" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Import the required packages and libraries\n", - "import datetime\n", - "import os\n", - "from dotenv import load_dotenv \n", - "from pathlib import Path\n", - "from typing import List\n", - "import pandas as pd\n", - "import copy\n", - "import json\n", - "from requests import HTTPError" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Import Scripts" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from pyprediktormapclient.opc_ua import OPC_UA\n", - "from pyprediktormapclient.model_index import ModelIndex\n", - "from pyprediktormapclient.auth_client import AUTH_CLIENT\n", - "from pyprediktormapclient.analytics_helper import AnalyticsHelper\n", - "from pyprediktormapclient.shared import *" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Import Envrionment Variables" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Consider obtaining the envrionment variables from .env file if you are running this locally from source.\n", - "dotenv_path = Path(\"../.env\")\n", - "load_dotenv(dotenv_path=dotenv_path)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "username = os.environ[\"USERNAME\"]\n", - "password = os.environ[\"PASSWORD\"]\n", - "opcua_rest_url = os.environ[\"OPC_UA_REST_URL\"]\n", - "opcua_server_url = os.environ[\"OPC_UA_SERVER_URL\"]\n", - "model_index_url = os.environ[\"MODEL_INDEX_URL\"]\n", - "ory_url = os.environ[\"ORY_URL\"]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Getting ory bearer token\n", - "auth_client = AUTH_CLIENT(rest_url=ory_url, username=username, password=password)\n", - "auth_client.request_new_ory_token()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Download data from modelindex api" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Connecting to ModelIndex APIs \n", - "model_data = ModelIndex(url=model_index_url)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Listed sites on the model index api server\n", - "namespaces = model_data.get_namespace_array()\n", - "namespaces" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Types of Objects\n", - "object_types_json = model_data.get_object_types()\n", - "object_types = AnalyticsHelper(object_types_json)\n", - "object_types.dataframe" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Unique types of Objects\n", - "object_types_unique = object_types.dataframe[[\"Id\", \"Name\"]].drop_duplicates()\n", - "object_types_unique" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# To get typeId by type name of an object\n", - "object_type_id = model_data.get_object_type_id_from_name(\"SiteType\")\n", - "object_type_id" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "str(object_type_id.split(\":\")[1])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# To get the objects of a type\n", - "sites_json = model_data.get_objects_of_type(\"SiteType\")\n", - "\n", - "# Send the returned JSON into a normalizer to get Id, Type, Name, Props and Vars as columns\n", - "sites = AnalyticsHelper(sites_json)\n", - "sites.list_of_names()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Analytics helper\n", - "sites.variables_as_dataframe()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "sites.list_of_ids()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Selecting the single site\n", - "site_id = sites.list_of_ids()[0]\n", - "site_id" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Get all stringsets for one park\n", - "string_sets_for_first_park_as_json = model_data.get_object_descendants(\n", - " \"StringSetType\", [site_id], \"PV_Assets\"\n", - ")\n", - "string_sets_for_first_park = AnalyticsHelper(string_sets_for_first_park_as_json)\n", - "string_sets_for_first_park.dataframe" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Ancestors of an object type, get all trackers that are ancestor of the parks string sets\n", - "\n", - "trackers_as_json = model_data.get_object_ancestors(\n", - " \"TrackerType\", string_sets_for_first_park.list_of_ids(), \"PV_Serves\"\n", - ")\n", - "trackers = AnalyticsHelper(trackers_as_json)\n", - "trackers.variables_as_dataframe()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Download data from the opc ua api" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "namespace_list = object_types.namespaces_as_list(namespaces)\n", - "\n", - "# Initating the OPC UA API with a fixed namespace list\n", - "opc_data = OPC_UA(rest_url=opcua_rest_url, opcua_url=opcua_server_url, namespaces=namespace_list)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Read Historical Events" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def get_historical_raw_values(opc_data,\n", - " start_time,\n", - " end_time,\n", - " variable_list,\n", - " limit_start_index=None,\n", - " limit_num_records=None,\n", - ") -> pd.DataFrame:\n", - " \n", - " # Create a new variable list to remove pydantic models\n", - " vars = opc_data._get_variable_list_as_list(variable_list)\n", - "\n", - " extended_variables = []\n", - " for var in vars:\n", - " extended_variables.append(\n", - " {\n", - " \"NodeId\": var,\n", - " }\n", - " )\n", - " body = copy.deepcopy(opc_data.body)\n", - " body[\"StartTime\"] = start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", - " body[\"EndTime\"] = end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", - " body[\"ReadValueIds\"] = extended_variables\n", - " if limit_start_index is not None and limit_num_records is not None:\n", - " body[\"Limit\"] = {\n", - " \"StartIndex\": limit_start_index,\n", - " \"NumRecords\": limit_num_records\n", - " }\n", - " \n", - " print(body)\n", - "\n", - " # Try making the request, if fails check if it is due to ory client\n", - " content = request_from_api(\n", - " rest_url=opcua_rest_url,\n", - " method=\"POST\",\n", - " endpoint=\"values/historical\",\n", - " data=json.dumps(body, default=opc_data.json_serial),\n", - " headers=opc_data.headers,\n", - " extended_timeout=True,\n", - " )\n", - "\n", - " df_result = pd.json_normalize(content, record_path=['HistoryReadResults', 'DataValues'], meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId','Id'],['HistoryReadResults', 'NodeId','Namespace']] )\n", - " df_result.rename(\n", - " columns={\n", - " \"Value.Type\": \"ValueType\",\n", - " \"Value.Body\": \"Value\",\n", - " \"SourceTimestamp\": \"Timestamp\",\n", - " \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", - " \"HistoryReadResults.NodeId.Id\": \"Id\",\n", - " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", - " },\n", - " errors=\"raise\",\n", - " inplace=True,\n", - " )\n", - "\n", - " return df_result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "base_event_type = \"0:0:2782\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def get_event_types(opc_data, base_event_type: str) -> pd.DataFrame:\n", - "\n", - " namespace, id_type, id = map(int, base_event_type.split(':'))\n", - "\n", - " body = copy.deepcopy(opc_data.body)\n", - " body[\"BaseEventType\"] = {\n", - " \"Id\": str(id),\n", - " \"Namespace\": namespace,\n", - " \"IdType\": id_type\n", - " }\n", - " print(body)\n", - "\n", - " try:\n", - " # Try making the request, if fails check if it is due to ory client\n", - " content = request_from_api(\n", - " rest_url=opcua_rest_url,\n", - " method=\"POST\",\n", - " endpoint=\"events/types\",\n", - " data=json.dumps(body, default=opc_data.json_serial),\n", - " headers=opc_data.headers,\n", - " extended_timeout=True,\n", - " )\n", - "\n", - " except HTTPError as e:\n", - " if opc_data.auth_client is not None:\n", - " opc_data.check_auth_client(json.loads(e.response.content))\n", - " else:\n", - " raise RuntimeError(f'Error message {e}')\n", - " \n", - " df_result = pd.DataFrame(content['EventTypes'])\n", - "\n", - " df_result['BrowseName'] = df_result['BrowseName'].apply(lambda x: x.get('Name', None))\n", - " df_result['Id'] = df_result['NodeId'].apply(lambda x: x.get('Id', None))\n", - " df_result['Namespace'] = df_result['NodeId'].apply(lambda x: x.get('Namespace'))\n", - "\n", - " df_result['Namespace'] = df_result['Namespace'].fillna(0).astype(int)\n", - " df_result.drop(columns=['NodeId', 'DisplayName'], inplace=True)\n", - " \n", - "\n", - " return df_result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df_result = get_event_types(opc_data, base_event_type)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def get_event_type_id_from_name(df_result, type_name: str) -> str:\n", - " \"\"\"Get event type id and namespace from type name\n", - "\n", - " Args:\n", - " type_name (str): type name\n", - "\n", - " Returns:\n", - " str: an object of event type id and namespace in the form of a tuple\n", - " \"\"\"\n", - " event_type = df_result[df_result[\"BrowseName\"] == type_name]\n", - " if not event_type.empty:\n", - " event_type_id, namespace = event_type[[\"Id\", \"Namespace\"]].values[0]\n", - " else:\n", - " event_type_id = None\n", - " \n", - " event_type_id = f\"{namespace}:0:{event_type_id}\"\n", - "\n", - " return event_type_id" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "type_id = get_event_type_id_from_name(df_result, \"SiteEventType\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "int(type_id.split(\":\")[1])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "type_id[1]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df_result[df_result[\"BrowseName\"]==\"TrackerEventType\"]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "object_type_id = model_data.get_object_type_id_from_name(\"EnergyAndPowerMeterEventType\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "start_time=(datetime.datetime.now() - datetime.timedelta(30))\n", - "end_time=(datetime.datetime.now() - datetime.timedelta(29))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "variable_list=trackers.variables_as_list([\"AngleMeasured\", \"AngleSetpoint\"], True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fields_list = [\"Time\", \"Message\", \"SourceName\"]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "event_type_noded_id = get_event_type_id_from_name(df_result, \"TrackerEventType\")\n", - "event_type_noded_id" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def read_historical_events(opc_data,\n", - " start_time,\n", - " end_time,\n", - " variable_list,\n", - " fields_list,\n", - " event_type_noded_id=None,\n", - " limit_start_index=None,\n", - " limit_num_records=None,\n", - ") -> pd.DataFrame:\n", - " \n", - " # Create a new variable list to remove pydantic models\n", - " vars = opc_data._get_variable_list_as_list(variable_list)\n", - "\n", - " extended_variables = []\n", - " for var in vars:\n", - " extended_variables.append(\n", - " {\n", - " \"NodeId\": var,\n", - " }\n", - " )\n", - "\n", - " body = copy.deepcopy(opc_data.body)\n", - " body[\"StartTime\"] = start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", - " body[\"EndTime\"] = end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", - " body[\"Fields\"] = fields_list\n", - " body[\"ReadValueIds\"] = extended_variables\n", - "\n", - " if event_type_noded_id:\n", - " body[\"WhereClause\"] = {\n", - " \"EventTypeNodedId\": {\n", - " \"Id\": int(event_type_noded_id.split(\":\")[2]),\n", - " \"Namespace\": int(event_type_noded_id.split(\":\")[0]),\n", - " \"IdType\": int(event_type_noded_id.split(\":\")[1])\n", - " }\n", - " }\n", - " if limit_start_index is not None and limit_num_records is not None:\n", - " body[\"Limit\"] = {\n", - " \"StartIndex\": limit_start_index,\n", - " \"NumRecords\": limit_num_records\n", - " }\n", - " \n", - " print(body)\n", - "\n", - " # Try making the request, if fails check if it is due to ory client\n", - " content = request_from_api(\n", - " rest_url=opcua_rest_url,\n", - " method=\"POST\",\n", - " endpoint=\"events/read\",\n", - " data=json.dumps(body, default=opc_data.json_serial),\n", - " headers=opc_data.headers,\n", - " extended_timeout=True,\n", - " )\n", - "\n", - " df_result = pd.json_normalize(content, record_path=[\"EventsResult\"])\n", - " \n", - " df_hist_event = df_result.explode('HistoryEvents')\n", - " df_hist_event_normalized = pd.json_normalize(df_hist_event['HistoryEvents'])\n", - " df_hist_event_normalized = df_hist_event_normalized[fields_list]\n", - "\n", - " df_final = pd.concat([df_hist_event[df_hist_event.columns.difference(['HistoryEvents'])].reset_index(drop=True), df_hist_event_normalized.reset_index(drop=True)], axis=1)\n", - " new_columns = fields_list + [col for col in df_final.columns if col not in fields_list]\n", - " df_final = df_final[new_columns]\n", - " df_final.rename(\n", - " columns={\n", - " \"NodeId.Id\": \"Id\",\n", - " \"NodeId.IdType\": \"IdType\",\n", - " \"NodeId.Namespace\": \"Namespace\",\n", - " \"StatusCode.Code\": \"StatusCode\",\n", - " \"StatusCode.Symbol\": \"Quality\",\n", - " },\n", - " errors=\"raise\",\n", - " inplace=True,\n", - " )\n", - "\n", - " return df_final" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df_hist_event = read_historical_events(opc_data,\n", - " start_time,\n", - " end_time,\n", - " variable_list,\n", - " fields_list,\n", - " event_type_noded_id=None,\n", - " limit_start_index=None,\n", - " limit_num_records=None,\n", - ")\n", - "df_hist_event" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### End" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Live value data of trackers\n", - "live_value = opc_data.get_values(\n", - " trackers.variables_as_list([\"AngleMeasured\", \"AngleSetpoint\"])\n", - ")\n", - "live_value" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Raw historic value data of trackers\n", - "one_day_raw_historic_tracker_data = opc_data.get_historical_raw_values(\n", - " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", - " end_time=(datetime.datetime.now() - datetime.timedelta(29)),\n", - " variable_list=trackers.variables_as_list([\"AngleSetpoint\"]),\n", - ")\n", - "one_day_raw_historic_tracker_data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Aggregated historic value data of trackers, 1 days worth of data 30 days ago\n", - "one_day_historic_tracker_data = opc_data.get_historical_aggregated_values(\n", - " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", - " end_time=(datetime.datetime.now() - datetime.timedelta(29)),\n", - " pro_interval=3600000,\n", - " agg_name=\"Average\",\n", - " variable_list=trackers.variables_as_list([\"AngleMeasured\"]),\n", - ")\n", - "one_day_historic_tracker_data" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": ".venv_auth", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.2" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/src/pyprediktormapclient/opc_ua.py b/src/pyprediktormapclient/opc_ua.py index 002487c..f160e2b 100644 --- a/src/pyprediktormapclient/opc_ua.py +++ b/src/pyprediktormapclient/opc_ua.py @@ -225,6 +225,166 @@ def _get_variable_list_as_list(self, variable_list: list) -> list: raise TypeError("Unsupported type in variable_list") return new_vars + + def get_event_types(self, + base_event_type_id: str = "0:0:2782" + ) -> pd.DataFrame: + """ + Fetches event types based on the provided event type id or by default uses base event type id for all event types. + + Args: + base_event_type_id (str): The base event type id in the format 'namespace:id_type:id'. + + Returns: + pd.DataFrame: A DataFrame containing the BrowseName, Id, and Namespace of each event type. + """ + + namespace, id_type, id = map(int, base_event_type_id.split(':')) + + body = copy.deepcopy(self.body) + body["BaseEventType"] = { + "Id": str(id), + "Namespace": namespace, + "IdType": id_type + } + + try: + content = request_from_api( + rest_url=self.rest_url, + method="POST", + endpoint="events/types", + data=json.dumps(body, default=self.json_serial), + headers=self.headers, + extended_timeout=True, + ) + + except HTTPError as e: + if self.auth_client is not None: + self.check_auth_client(json.loads(e.response.content)) + else: + raise RuntimeError(f'Error message {e}') + + df_result = pd.DataFrame(content['EventTypes']) + + df_result['BrowseName'] = df_result['BrowseName'].apply(lambda x: x.get('Name', None)) + df_result['Id'] = df_result['NodeId'].apply(lambda x: x.get('Id', None)) + df_result['Namespace'] = df_result['NodeId'].apply(lambda x: x.get('Namespace')) + + df_result['Namespace'] = df_result['Namespace'].fillna(0).astype(int) + df_result.drop(columns=['NodeId', 'DisplayName'], inplace=True) + + return df_result + + def get_event_type_id_from_name(self, event_type_name: str) -> str: + """Get event type id and namespace from type name + + Args: + type_name (str): event type name + + Returns: + str: an object of event type id and namespace in the form of a tuple + """ + df_result = self.get_event_types() + event_type = df_result[df_result["BrowseName"] == event_type_name] + + if not event_type.empty: + event_type_id, namespace = event_type[["Id", "Namespace"]].values[0] + else: + event_type_id = None + + event_type_id = f"{namespace}:0:{event_type_id}" + + return event_type_id + + def read_historical_events(self, + start_time: datetime, + end_time: datetime, + variable_list: List[Variables], + fields_list: List[str], + event_type_name: str, + limit_start_index: Union[int, None] = None, + limit_num_records: Union[int, None] = None, + ) -> pd.DataFrame: + """ + Reads historical events from an API. + + Args: + start_time (datetime): The start time for the historical data. + end_time (datetime): The end time for the historical data. + variable_list (List[Variables]): A list of variables to include in the request. + fields_list (List[str]): A list of fields to include in the request. + event_type_name (str, optional): The name of the event type to filter by. Defaults to None. + limit_start_index (Union[int, None], optional): The starting index for the limit. Defaults to None. + limit_num_records (Union[int, None], optional): The number of records for the limit. Defaults to None. + + Returns: + pd.DataFrame: A DataFrame containing the historical events. + """ + + # Create a new variable list to remove pydantic models + vars = self._get_variable_list_as_list(variable_list) + + extended_variables = [] + for var in vars: + extended_variables.append( + { + "NodeId": var, + } + ) + + body = copy.deepcopy(self.body) + body["StartTime"] = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + body["EndTime"] = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + body["Fields"] = fields_list + body["ReadValueIds"] = extended_variables + + event_type_noded_id = self.get_event_type_id_from_name(event_type_name) + + if event_type_noded_id: + body["WhereClause"] = { + "EventTypeNodedId": { + "Id": int(event_type_noded_id.split(":")[2]), + "Namespace": int(event_type_noded_id.split(":")[0]), + "IdType": int(event_type_noded_id.split(":")[1]) + } + } + if limit_start_index is not None and limit_num_records is not None: + body["Limit"] = { + "StartIndex": limit_start_index, + "NumRecords": limit_num_records + } + + content = request_from_api( + rest_url=self.rest_url, + method="POST", + endpoint="events/read", + data=json.dumps(body, default=self.json_serial), + headers=self.headers, + extended_timeout=True, + ) + + df_result = pd.json_normalize(content, record_path=["EventsResult"]) + df_hist_event = df_result.explode('HistoryEvents') + df_hist_event_normalized = pd.json_normalize(df_hist_event['HistoryEvents']) + df_hist_event_normalized = df_hist_event_normalized[fields_list] + + df_final = pd.concat([df_hist_event[df_hist_event.columns.difference(['HistoryEvents'])].reset_index(drop=True), df_hist_event_normalized.reset_index(drop=True)], axis=1) + new_columns = fields_list + [col for col in df_final.columns if col not in fields_list] + df_final = df_final[new_columns] + df_final.rename( + columns={ + "NodeId.Id": "Id", + "NodeId.IdType": "IdType", + "NodeId.Namespace": "Namespace", + "StatusCode.Code": "StatusCode", + "StatusCode.Symbol": "Quality", + }, + errors="raise", + inplace=True, + ) + + df_final.drop(columns=["IdType", "Namespace", "StatusCode", "Quality"], inplace=True) + return df_final def get_values(self, variable_list: List[Variables]) -> List: