diff --git a/.github/workflows/pages.yml b/.github/workflows/pages.yml index 4262a53..932885d 100644 --- a/.github/workflows/pages.yml +++ b/.github/workflows/pages.yml @@ -11,6 +11,10 @@ jobs: - uses: actions/checkout@master with: fetch-depth: 0 # otherwise, you will failed to push refs to dest repo + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt - name: Build and Commit uses: sphinx-notes/pages@v2 - name: Push changes diff --git a/.github/workflows/tox.yml b/.github/workflows/tox.yml index b229aaa..46128a5 100644 --- a/.github/workflows/tox.yml +++ b/.github/workflows/tox.yml @@ -64,6 +64,7 @@ jobs: python -VV python -m pip install --upgrade pip python -m pip install tox tox-gh-actions pytest + python -m pip install -r requirements.txt python -m pip install -e . echo "*** pyodbc version" python -c "import pyodbc; print(pyodbc.version)" diff --git a/notebooks/Example_Data_Downloading.ipynb b/notebooks/Example_Data_Downloading.ipynb index f16eed4..6441fcb 100644 --- a/notebooks/Example_Data_Downloading.ipynb +++ b/notebooks/Example_Data_Downloading.ipynb @@ -103,16 +103,6 @@ "model.get_objects_of_type(\"InverterType\")" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "sites_json = model.get_objects_of_type(\"SiteType\")\n", - "sites_json" - ] - }, { "cell_type": "code", "execution_count": null, diff --git a/notebooks/Exploring_API_Functions_Authentication.ipynb b/notebooks/Exploring_API_Functions_Authentication.ipynb index 9d5246e..865a5a6 100644 --- a/notebooks/Exploring_API_Functions_Authentication.ipynb +++ b/notebooks/Exploring_API_Functions_Authentication.ipynb @@ -17,7 +17,10 @@ "import datetime\n", "import os\n", "from dotenv import load_dotenv \n", - "from pathlib import Path" + "from pathlib import Path\n", + "import nest_asyncio\n", + "\n", + "nest_asyncio.apply()" ] }, { @@ -256,15 +259,11 @@ "metadata": {}, "outputs": [], "source": [ - "# History data\n", - "inverters_hist_df = opc_data.get_historical_aggregated_values(\n", - " start_time=datetime.datetime.now() - datetime.timedelta(2),\n", - " end_time=datetime.datetime.now() - datetime.timedelta(1),\n", - " pro_interval=600000,\n", - " agg_name=\"Average\",\n", - " variable_list=inverters.variables_as_list([\"DCPower\"]),\n", + "# Live value data of trackers\n", + "live_value = opc_data.get_values(\n", + " trackers.variables_as_list([\"AngleMeasured\"])\n", ")\n", - "inverters_hist_df" + "live_value" ] }, { @@ -273,11 +272,15 @@ "metadata": {}, "outputs": [], "source": [ - "# Live value data of trackers\n", - "live_value = opc_data.get_values(\n", - " trackers.variables_as_list([\"AngleMeasured\"])\n", + "# 1 day aggregated historical data\n", + "one_day_historical_data = opc_data.get_historical_aggregated_values(\n", + " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", + " end_time=(datetime.datetime.now() - datetime.timedelta(29)),\n", + " pro_interval=60*1000,\n", + " agg_name=\"Average\",\n", + " variable_list=string_sets_for_first_park.variables_as_list([\"DCPower\"])\n", ")\n", - "live_value" + "one_day_historical_data" ] }, { @@ -286,15 +289,13 @@ "metadata": {}, "outputs": [], "source": [ - "# Historic value data of trackers, 1 days worth of data 30 days ago\n", - "one_day_historic_tracker_data = opc_data.get_historical_aggregated_values(\n", - " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", - " end_time=(datetime.datetime.now() - datetime.timedelta(29)),\n", - " pro_interval=3600000,\n", - " agg_name=\"Average\",\n", - " variable_list=trackers.variables_as_list([\"AngleMeasured\"]),\n", + "# 1 day raw historical data\n", + "one_day_raw_historical_data = opc_data.get_raw_historical_values(\n", + " start_time = datetime.datetime(2024, 7, 13, 00, 00),\n", + " end_time = datetime.datetime(2024, 7, 13, 23, 59),\n", + " variable_list=string_sets_for_first_park.variables_as_list([\"DCPower\"])\n", ")\n", - "one_day_historic_tracker_data" + "one_day_raw_historical_data" ] }, { diff --git a/notebooks/api_performance_testing.ipynb b/notebooks/api_performance_testing.ipynb new file mode 100644 index 0000000..5022eda --- /dev/null +++ b/notebooks/api_performance_testing.ipynb @@ -0,0 +1,2188 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook explores both model index and opc ua scripts and contain examples of all the functions to make request to model index api and opc ua api servers. " + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Import Libraries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Import the required packeages\n", + "import pandas as pd\n", + "import os\n", + "import json\n", + "import datetime\n", + "import concurrent.futures\n", + "from dotenv import load_dotenv\n", + "from pathlib import Path\n", + "from dateutil.relativedelta import relativedelta" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Import Scripts" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Import model index functions\n", + "from pyprediktormapclient.model_index import ModelIndex\n", + "\n", + "# Import OPC UA functions\n", + "from pyprediktormapclient.opc_ua import OPC_UA\n", + "\n", + "# Import Analytics Helper\n", + "from pyprediktormapclient.analytics_helper import AnalyticsHelper\n", + "\n", + "# Import \"Dataframer\" Tools\n", + "from pyprediktormapclient.shared import *\n", + "\n", + "# import AUTH_CLIENT\n", + "from pyprediktormapclient.auth_client import AUTH_CLIENT" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Consider obtaining the envrionment variables from .env file if you are running this locally from source.\n", + "dotenv_path = Path(\".env\")\n", + "load_dotenv(dotenv_path=dotenv_path)\n", + "\n", + "username = os.environ[\"USERNAME\"]\n", + "password = os.environ[\"PASSWORD\"]\n", + "opcua_rest_url = os.environ[\"OPC_UA_REST_URL\"]\n", + "opcua_server_url = os.environ[\"OPC_UA_SERVER_URL\"]\n", + "model_index_url = os.environ[\"MODEL_INDEX_URL\"]\n", + "ory_url = os.environ[\"ORY_URL\"]\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Getting ory bearer token\n", + "auth_client = AUTH_CLIENT(rest_url=ory_url, username=username, password=password)\n", + "auth_client.request_new_ory_token()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Connecting to ModelIndex APIs \n", + "model = ModelIndex(url=model_index_url, auth_client=auth_client, session=auth_client.session)\n", + "\n", + "# Listed sites on the model index api server\n", + "namespaces = model.get_namespace_array()\n", + "# Types of Objects\n", + "object_types_json = model.get_object_types()\n", + "object_types = AnalyticsHelper(object_types_json)\n", + "namespace_list = object_types.namespaces_as_list(namespaces)\n", + "\n", + "# Initate the OPC UA API with a fixed namespace list\n", + "opc_data = OPC_UA(rest_url=opcua_rest_url, opcua_url=opcua_server_url, namespaces=namespace_list, auth_client=auth_client)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Download data from modelindex api" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Unique types of Objects\n", + "object_types_unique = object_types.dataframe[[\"Id\", \"Name\"]].drop_duplicates()\n", + "object_types_unique" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# To get the objects of a type\n", + "sites_json = model.get_objects_of_type(\"SiteType\")\n", + "\n", + "# Send the returned JSON into a normalizer to get Id, Type, Name, Props and Vars as columns\n", + "sites = AnalyticsHelper(sites_json)\n", + "sites.list_of_names()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Analytics helper\n", + "sites.variables_as_dataframe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sites.list_of_ids()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Selecting the second site\n", + "first_site_id = sites.list_of_ids()[0]\n", + "# first_site_id = '14:1:BE.DK-ADU'\n", + "first_site_id" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get all stringsets for one park\n", + "string_sets_for_first_park_as_json = model.get_object_descendants(\n", + " \"StringSetType\", [first_site_id], \"PV_Assets\"\n", + ")\n", + "string_sets = AnalyticsHelper(string_sets_for_first_park_as_json)\n", + "string_sets.dataframe" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Query Parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "variable_list =string_sets.variables_as_list([\"DCPower\"])\n", + "start_time = datetime.datetime(2023, 11, 13, 00, 00)\n", + "end_time = datetime.datetime(2023, 11, 13, 23, 59)\n", + "pro_interval=60*1000\n", + "agg_name=\"Average\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Batching with Async Refactoring" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import asyncio\n", + "import aiohttp\n", + "from aiohttp import ClientSession\n", + "from asyncio import Semaphore\n", + "from datetime import timedelta\n", + "from typing import Dict, List, Tuple\n", + "\n", + "logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n", + "logger = logging.getLogger(__name__)\n", + "\n", + "async def generate_time_batches(start_time: datetime, end_time: datetime, pro_interval: int, max_data_points: int) -> List[tuple]:\n", + " \"\"\"Generate time batches based on start time, end time, processing interval, and batch size\"\"\"\n", + "\n", + " total_time_range_ms = (end_time - start_time).total_seconds() * 1000\n", + " estimated_intervals = total_time_range_ms / pro_interval\n", + " \n", + " max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))\n", + " max_time_batches = max(1, int(estimated_intervals / max_data_points))\n", + "\n", + " time_batch_size_ms = total_time_range_ms / max_time_batches\n", + "\n", + " return total_time_range_ms, max_variables_per_batch, time_batch_size_ms, max_time_batches\n", + "\n", + "def generate_variable_batches(start_time, end_time, pro_interval, variable_list: List[Dict[str, str]], max_data_points) -> List:\n", + " \"\"\"Generate variable batches based on the variable list and batch size\"\"\"\n", + "\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in variable_list]\n", + " max_variables_per_batch = generate_time_batches(start_time, end_time, pro_interval, max_data_points)[1]\n", + "\n", + " variable_batches = [\n", + " extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)\n", + " ]\n", + "\n", + " return variable_batches\n", + "\n", + "def _prepare_body(\n", + " start_time: datetime,\n", + " end_time: datetime,\n", + " pro_interval: int,\n", + " variable_list: List[Dict[str, str]], \n", + " agg_name: str,\n", + " ) -> Dict:\n", + " \"\"\"\n", + " Prepare the request body for the API call.\n", + " \"\"\"\n", + " total_time_range_ms, max_variables_per_batch, time_batch_size_ms, max_time_batches = generate_time_batches(\n", + " start_time, end_time, pro_interval, 10000)\n", + "\n", + " for time_batch in range(max_time_batches):\n", + " batch_start_ms = time_batch * time_batch_size_ms\n", + " batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)\n", + " batch_start = start_time + timedelta(milliseconds=batch_start_ms)\n", + " batch_end = start_time + timedelta(milliseconds=batch_end_ms)\n", + "\n", + " variable_batches = generate_variable_batches(variable_list)\n", + "\n", + " for variables in variable_batches:\n", + " body = {\n", + " **opc_data.body,\n", + " \"StartTime\": batch_start.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"EndTime\": batch_end.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"ProcessingInterval\": pro_interval,\n", + " \"ReadValueIds\": variables,\n", + " \"AggregateName\": agg_name\n", + " }\n", + " return body\n", + " \n", + "def process_batch(content: dict) -> pd.DataFrame:\n", + " \"\"\" Process individual batch of data \"\"\"\n", + " \n", + " df_list = []\n", + " for item in content[\"HistoryReadResults\"]:\n", + " df = pd.json_normalize(item[\"DataValues\"])\n", + " for key, value in item[\"NodeId\"].items():\n", + " df[f\"HistoryReadResults.NodeId.{key}\"] = value\n", + " df_list.append(df)\n", + " \n", + " if df_list:\n", + " df_result = pd.concat(df_list)\n", + " df_result.reset_index(inplace=True, drop=True)\n", + " return df_result\n", + " else:\n", + " return pd.DataFrame()\n", + " \n", + "async def make_async_api_request(opc_data, start_time:datetime, end_time:datetime,\n", + " pro_interval: int, variable_list: List[Dict[str, str]], agg_name: str,\n", + " semaphore, max_retries: int = 3, retry_delay: int = 5) -> dict:\n", + " \n", + " \"\"\"Make API request for the given time range and variable list\"\"\"\n", + "\n", + " async with semaphore:\n", + " body = _prepare_body(\n", + " start_time, \n", + " end_time, \n", + " pro_interval, \n", + " variable_list,\n", + " agg_name\n", + " )\n", + " for attempt in range(max_retries):\n", + " try:\n", + " async with ClientSession() as session:\n", + " async with session.post(\n", + " f\"{opcua_rest_url}values/historicalaggregated\",\n", + " json=body,\n", + " headers=opc_data.headers\n", + " ) as response:\n", + " response.raise_for_status()\n", + " content = await response.json()\n", + " break\n", + " except aiohttp.ClientError as e:\n", + " if attempt < max_retries - 1:\n", + " wait_time = retry_delay * (2 ** attempt)\n", + " logger.warning(f\"Request failed. Retrying in {wait_time} seconds...\")\n", + " await asyncio.sleep(wait_time)\n", + " else:\n", + " logger.error(f\"Max retries reached. Error: {e}\")\n", + " raise RuntimeError(f'Error message {e}')\n", + "\n", + " opc_data._check_content(content)\n", + "\n", + " df_result = process_batch(content)\n", + " return df_result\n", + " \n", + "async def process_api_response(opc_data, start_time:datetime, end_time:datetime,\n", + " pro_interval: int, variable_list: List[Dict[str, str]], agg_name: str,\n", + " max_concurrent_requests: int = 10) -> pd.DataFrame:\n", + " \"\"\" Process API response asynchronously and return the result dataframe \"\"\"\n", + " all_results = []\n", + " semaphore = Semaphore(max_concurrent_requests)\n", + "\n", + " tasks = [\n", + " make_async_api_request(opc_data, start_time, end_time, pro_interval, variable_list, agg_name, semaphore)\n", + " ]\n", + " results = await asyncio.gather(*tasks)\n", + " all_results.extend(results)\n", + " \n", + " if all_results:\n", + " combined_df = pd.concat(all_results, ignore_index=True)\n", + " combined_df.reset_index(inplace=True, drop=True)\n", + " columns = {\n", + " \"Value.Type\": \"ValueType\",\n", + " \"Value.Body\": \"Value\",\n", + " \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " \"StatusCode.Code\": \"StatusCode\",\n", + " \"SourceTimestamp\": \"Timestamp\",\n", + " \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " }\n", + " return opc_data._process_df(combined_df, columns)\n", + " else:\n", + " return pd.DataFrame()\n", + " \n", + "async def get_historical_aggregated_values_async(\n", + " opc_data,\n", + " start_time: datetime,\n", + " end_time: datetime,\n", + " pro_interval: int,\n", + " variable_list: List[Dict[str, str]],\n", + " agg_name: str,\n", + ") -> pd.DataFrame:\n", + " \"\"\"Request historical aggregated values from the OPC UA server with batching\"\"\"\n", + "\n", + " \n", + " result_df = await process_api_response(opc_data, start_time, end_time, pro_interval, variable_list, agg_name)\n", + "\n", + " return result_df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1 day aggregated historical inverter data in asyncio process\n", + "one_days_historic_inverter_data2 = await get_historical_aggregated_values_batch_time_vars_async(\n", + " start_time=start_time,\n", + " end_time=end_time,\n", + " pro_interval=60*1000,\n", + " agg_name=\"Average\",\n", + " variable_list=string_sets.variables_as_list([\"DCPower\"])\n", + ")\n", + "one_days_historic_inverter_data2" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Batching with Async" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import asyncio\n", + "import aiohttp\n", + "from aiohttp import ClientSession\n", + "from asyncio import Semaphore\n", + "from datetime import timedelta\n", + "\n", + "logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n", + "logger = logging.getLogger(__name__)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def get_historical_aggregated_values_batch_time_vars_async(\n", + " self, \n", + " start_time: datetime, \n", + " end_time: datetime, \n", + " pro_interval: int, \n", + " agg_name: str, \n", + " variable_list: list, \n", + " max_data_points: int = 10000, \n", + " max_retries: int = 3, \n", + " retry_delay: int = 5, \n", + " max_concurrent_requests: int = 10\n", + ") -> pd.DataFrame:\n", + " \n", + " \"\"\"Request historical aggregated values from the OPC UA server with batching\"\"\"\n", + "\n", + " total_time_range_ms = (end_time - start_time).total_seconds() * 1000\n", + " estimated_intervals = total_time_range_ms / pro_interval\n", + "\n", + " max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))\n", + " max_time_batches = max(1, int(estimated_intervals / max_data_points))\n", + " time_batch_size_ms = total_time_range_ms / max_time_batches\n", + "\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in variable_list]\n", + " variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]\n", + "\n", + " all_results = []\n", + " semaphore = Semaphore(max_concurrent_requests)\n", + "\n", + " async def process_batch(variables, time_batch):\n", + " async with semaphore:\n", + " batch_start_ms = time_batch * time_batch_size_ms\n", + " batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)\n", + " batch_start = start_time + timedelta(milliseconds=batch_start_ms)\n", + " batch_end = start_time + timedelta(milliseconds=batch_end_ms)\n", + "\n", + " body = {\n", + " **self.body,\n", + " \"StartTime\": batch_start.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"EndTime\": batch_end.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"ProcessingInterval\": pro_interval,\n", + " \"ReadValueIds\": variables,\n", + " \"AggregateName\": agg_name\n", + " }\n", + "\n", + " for attempt in range(max_retries):\n", + " try:\n", + " async with ClientSession() as session:\n", + " async with session.post(\n", + " f\"{self.rest_url}values/historicalaggregated\",\n", + " json=body,\n", + " headers=self.headers\n", + " ) as response:\n", + " response.raise_for_status()\n", + " content = await response.json()\n", + " break\n", + " except aiohttp.ClientError as e:\n", + " if attempt < max_retries - 1:\n", + " wait_time = retry_delay * (2 ** attempt)\n", + " logger.warning(f\"Request failed. Retrying in {wait_time} seconds...\")\n", + " await asyncio.sleep(wait_time)\n", + " else:\n", + " logger.error(f\"Max retries reached. Error: {e}\")\n", + " raise RuntimeError(f'Error message {e}')\n", + "\n", + " self._check_content(content)\n", + "\n", + " df_list = []\n", + " for item in content[\"HistoryReadResults\"]:\n", + " df = pd.json_normalize(item[\"DataValues\"])\n", + " for key, value in item[\"NodeId\"].items():\n", + " df[f\"HistoryReadResults.NodeId.{key}\"] = value\n", + " df_list.append(df)\n", + " \n", + " if df_list:\n", + " df_result = pd.concat(df_list)\n", + " df_result.reset_index(inplace=True, drop=True)\n", + " return df_result\n", + "\n", + " tasks = [\n", + " process_batch(variables, time_batch)\n", + " for variables in variable_batches\n", + " for time_batch in range(max_time_batches)\n", + " ]\n", + "\n", + " results = await asyncio.gather(*tasks)\n", + " all_results.extend(results)\n", + "\n", + " logger.info(\"Combining all batches...\")\n", + " combined_df = pd.concat(all_results, ignore_index=True)\n", + " columns = {\n", + " \"Value.Type\": \"ValueType\",\n", + " \"Value.Body\": \"Value\",\n", + " \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " \"StatusCode.Code\": \"StatusCode\",\n", + " \"SourceTimestamp\": \"Timestamp\",\n", + " \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " }\n", + " return self._process_df(combined_df, columns)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1 day aggregated historical data\n", + "one_day_historical_data = await get_historical_aggregated_values_batch_time_vars_async(\n", + " opc_data,\n", + " start_time=start_time,\n", + " end_time=end_time,\n", + " pro_interval=pro_interval,\n", + " agg_name=agg_name,\n", + " variable_list=variable_list,\n", + " max_data_points=10000,\n", + " max_concurrent_requests=35\n", + ")\n", + "one_day_historical_data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Batching with Async for Raw Historical Data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Dict, List, Any, Union, Optional" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def get_raw_historical_values_batch_time_vars_async(\n", + " self, \n", + " start_time: datetime, \n", + " end_time: datetime, \n", + " variable_list: list, \n", + " limit_start_index: Union[int, None] = None, \n", + " limit_num_records: Union[int, None] = None,\n", + " max_data_points: int = 10000, \n", + " max_retries: int = 3, \n", + " retry_delay: int = 5, \n", + " max_concurrent_requests: int = 10\n", + ") -> pd.DataFrame:\n", + " \n", + " \"\"\"Request historical aggregated values from the OPC UA server with batching\"\"\"\n", + "\n", + " total_time_range_ms = (end_time - start_time).total_seconds() * 1000\n", + " estimated_intervals = total_time_range_ms / max_data_points\n", + "\n", + " max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))\n", + " max_time_batches = max(1, int(estimated_intervals / max_data_points))\n", + " time_batch_size_ms = total_time_range_ms / max_time_batches\n", + "\n", + " extended_variables = [{\"NodeId\": var} for var in variable_list]\n", + " variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]\n", + "\n", + " all_results = []\n", + " semaphore = Semaphore(max_concurrent_requests)\n", + "\n", + " async def process_batch(variables, time_batch):\n", + " async with semaphore:\n", + " batch_start_ms = time_batch * time_batch_size_ms\n", + " batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)\n", + " batch_start = start_time + timedelta(milliseconds=batch_start_ms)\n", + " batch_end = start_time + timedelta(milliseconds=batch_end_ms)\n", + "\n", + " body = {\n", + " **self.body,\n", + " \"StartTime\": batch_start.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"EndTime\": batch_end.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"ReadValueIds\": variables,\n", + " }\n", + " \n", + " if limit_start_index is not None and limit_num_records is not None:\n", + " body[\"Limit\"] = {\"StartIndex\": limit_start_index, \"NumRecords\": limit_num_records}\n", + "\n", + " for attempt in range(max_retries):\n", + " try:\n", + " async with ClientSession() as session:\n", + " async with session.post(\n", + " f\"{self.rest_url}values/historical\",\n", + " json=body,\n", + " headers=self.headers\n", + " ) as response:\n", + " response.raise_for_status()\n", + " content = await response.json()\n", + " break\n", + " except aiohttp.ClientError as e:\n", + " if attempt < max_retries - 1:\n", + " wait_time = retry_delay * (2 ** attempt)\n", + " logger.warning(f\"Request failed. Retrying in {wait_time} seconds...\")\n", + " await asyncio.sleep(wait_time)\n", + " else:\n", + " logger.error(f\"Max retries reached. Error: {e}\")\n", + " raise RuntimeError(f'Error message {e}')\n", + "\n", + " self._check_content(content)\n", + "\n", + " df_list = []\n", + " for item in content[\"HistoryReadResults\"]:\n", + " df = pd.json_normalize(item[\"DataValues\"])\n", + " for key, value in item[\"NodeId\"].items():\n", + " df[f\"HistoryReadResults.NodeId.{key}\"] = value\n", + " df_list.append(df)\n", + " \n", + " if df_list:\n", + " df_result = pd.concat(df_list)\n", + " df_result.reset_index(inplace=True, drop=True)\n", + " return df_result\n", + "\n", + " tasks = [\n", + " process_batch(variables, time_batch)\n", + " for variables in variable_batches\n", + " for time_batch in range(max_time_batches)\n", + " ]\n", + "\n", + " results = await asyncio.gather(*tasks)\n", + " all_results.extend(results)\n", + "\n", + " logger.info(\"Combining all batches...\")\n", + " combined_df = pd.concat(all_results, ignore_index=True)\n", + " columns = {\n", + " \"Value.Type\": \"ValueType\",\n", + " \"Value.Body\": \"Value\",\n", + " \"SourceTimestamp\": \"Timestamp\",\n", + " \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " }\n", + " return self._process_df(combined_df, columns)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1 day raw historical data\n", + "one_day_raw_historical_data = await get_raw_historical_values_batch_time_vars_async(\n", + " opc_data,\n", + " start_time=start_time,\n", + " end_time=end_time,\n", + " variable_list=variable_list,\n", + " max_data_points=10000,\n", + " max_concurrent_requests=35\n", + ")\n", + "one_day_raw_historical_data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Async with ClientPool" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "import aiohttp\n", + "from aiohttp import ClientSession\n", + "from asyncio import Semaphore\n", + "from typing import List, Dict, Any\n", + "from datetime import datetime, timedelta\n", + "import pandas as pd\n", + "import logging\n", + "from pydantic import AnyUrl, ValidationError" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class ClientPool:\n", + " def __init__(self, num_clients: int, rest_url: str, headers: Dict[str, str]):\n", + " self.clients = asyncio.Queue()\n", + " for _ in range(num_clients):\n", + " self.clients.put_nowait(aiohttp.ClientSession(base_url=rest_url, headers=headers))\n", + " self.num_clients = num_clients\n", + "\n", + " async def get_client(self):\n", + " client = await self.clients.get()\n", + " return client\n", + "\n", + " async def release_client(self, client):\n", + " await self.clients.put(client)\n", + "\n", + " async def close_all(self):\n", + " while not self.clients.empty():\n", + " client = await self.clients.get()\n", + " await client.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def request_from_api_async(\n", + " client_pool: ClientPool,\n", + " method: str,\n", + " endpoint: str,\n", + " data: str = None,\n", + " params: Dict[str, Any] = None,\n", + " extended_timeout: bool = False,\n", + ") -> Dict[str, Any]:\n", + " timeout = aiohttp.ClientTimeout(total=300 if extended_timeout else 30)\n", + " client = await client_pool.get_client()\n", + " \n", + " try:\n", + " if method == \"GET\":\n", + " async with client.get(endpoint, params=params, timeout=timeout) as response:\n", + " response.raise_for_status()\n", + " if 'application/json' in response.headers.get('Content-Type', ''):\n", + " return await response.json()\n", + " else:\n", + " return {\"error\": \"Non-JSON response\", \"content\": await response.text()}\n", + " elif method == \"POST\":\n", + " async with client.post(endpoint, data=data, params=params, timeout=timeout) as response:\n", + " response.raise_for_status()\n", + " if 'application/json' in response.headers.get('Content-Type', ''):\n", + " return await response.json()\n", + " else:\n", + " return {\"error\": \"Non-JSON response\", \"content\": await response.text()}\n", + " else:\n", + " raise ValidationError(\"Unsupported method\")\n", + " finally:\n", + " await client_pool.release_client(client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def get_historical_aggregated_values_batch_time_vars_async(\n", + " self,\n", + " start_time: datetime,\n", + " end_time: datetime,\n", + " pro_interval: int,\n", + " agg_name: str,\n", + " variable_list: List[str],\n", + " max_data_points: int = 100000,\n", + " max_retries: int = 3,\n", + " retry_delay: int = 5,\n", + " max_concurrent_requests: int = 55\n", + ") -> pd.DataFrame:\n", + " logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n", + " logger = logging.getLogger(__name__)\n", + "\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in variable_list]\n", + " total_time_range_ms = (end_time - start_time).total_seconds() * 1000\n", + " estimated_intervals = total_time_range_ms / pro_interval\n", + " max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))\n", + " variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]\n", + " max_time_batches = max(1, int(estimated_intervals / max_data_points))\n", + " time_batch_size_ms = total_time_range_ms / max_time_batches\n", + "\n", + " all_results = []\n", + " semaphore = Semaphore(max_concurrent_requests)\n", + " client_pool = ClientPool(max_concurrent_requests, self.rest_url, self.headers)\n", + "\n", + " async def process_batch(variables, time_batch):\n", + " async with semaphore:\n", + " batch_start_ms = time_batch * time_batch_size_ms\n", + " batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)\n", + " batch_start = start_time + timedelta(milliseconds=batch_start_ms)\n", + " batch_end = start_time + timedelta(milliseconds=batch_end_ms)\n", + "\n", + " body = {\n", + " **self.body,\n", + " \"StartTime\": batch_start.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"EndTime\": batch_end.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"ProcessingInterval\": pro_interval,\n", + " \"ReadValueIds\": variables,\n", + " \"AggregateName\": agg_name\n", + " }\n", + "\n", + " for attempt in range(max_retries):\n", + " try:\n", + " content = await request_from_api_async(\n", + " client_pool,\n", + " method=\"POST\",\n", + " endpoint=f\"/values/historicalaggregated\",\n", + " data=json.dumps(body, default=self.json_serial),\n", + " extended_timeout=True\n", + " )\n", + " break\n", + " except (aiohttp.ClientError, ValidationError) as e:\n", + " if attempt < max_retries - 1:\n", + " wait_time = retry_delay * (2 ** attempt)\n", + " logger.warning(f\"Request failed. Retrying in {wait_time} seconds...\")\n", + " await asyncio.sleep(wait_time)\n", + " else:\n", + " logger.error(f\"Max retries reached. Error: {e}\")\n", + " raise RuntimeError(f'Error message {e}')\n", + "\n", + " self._check_content(content)\n", + "\n", + " df_list = []\n", + " for item in content[\"HistoryReadResults\"]:\n", + " df = pd.json_normalize(item[\"DataValues\"])\n", + " for key, value in item[\"NodeId\"].items():\n", + " df[f\"HistoryReadResults.NodeId.{key}\"] = value\n", + " df_list.append(df)\n", + " \n", + " if df_list:\n", + " df_result = pd.concat(df_list)\n", + " df_result.reset_index(inplace=True, drop=True)\n", + " return df_result\n", + "\n", + " tasks = [\n", + " process_batch(variables, time_batch)\n", + " for variables in variable_batches\n", + " for time_batch in range(max_time_batches)\n", + " ]\n", + "\n", + " try:\n", + " results = await asyncio.gather(*tasks)\n", + " all_results.extend(results)\n", + "\n", + " logger.info(\"Combining all batches...\")\n", + " combined_df = pd.concat(all_results, ignore_index=True)\n", + " columns = {\n", + " \"Value.Type\": \"ValueType\",\n", + " \"Value.Body\": \"Value\",\n", + " \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " \"StatusCode.Code\": \"StatusCode\",\n", + " \"SourceTimestamp\": \"Timestamp\",\n", + " \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " }\n", + " return self._process_df(combined_df, columns)\n", + " finally:\n", + " await client_pool.close_all()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import datetime" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1 day aggregated historical data\n", + "one_day_historical_data = await get_historical_aggregated_values_batch_time_vars_async(\n", + " opc_data,\n", + " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", + " end_time=(datetime.datetime.now() - datetime.timedelta(29)),\n", + " pro_interval=60*1000,\n", + " agg_name=\"Average\",\n", + " variable_list=string_sets.variables_as_list([\"DCPower\"]),\n", + " max_data_points=10000,\n", + " max_concurrent_requests=100\n", + ")\n", + "one_day_historical_data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Async with Data Handler" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "import aiohttp\n", + "import pandas as pd\n", + "import sqlite3\n", + "import tempfile\n", + "import os\n", + "import json\n", + "from asyncio import Semaphore\n", + "from typing import List, Dict, Any\n", + "from datetime import datetime, timedelta\n", + "import logging\n", + "import pyarrow as pa\n", + "import pyarrow.parquet as pq" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class DataHandler:\n", + " def __init__(self, max_memory_rows=10000):\n", + " self.max_memory_rows = max_memory_rows\n", + " self.temp_dir = tempfile.mkdtemp()\n", + " self.db_path = os.path.join(self.temp_dir, 'temp_data.db')\n", + " self.conn = sqlite3.connect(self.db_path)\n", + " self.conn.execute('''CREATE TABLE IF NOT EXISTS temp_data\n", + " (id INTEGER PRIMARY KEY AUTOINCREMENT,\n", + " batch_id TEXT,\n", + " data TEXT)''')\n", + "\n", + " async def save_data(self, batch_id: str, data: pd.DataFrame):\n", + " if len(data) <= self.max_memory_rows:\n", + " # Store small datasets directly in SQLite\n", + " self.conn.execute(\"INSERT INTO temp_data (batch_id, data) VALUES (?, ?)\",\n", + " (batch_id, data.to_json()))\n", + " else:\n", + " # Stream larger datasets to Parquet file\n", + " file_path = os.path.join(self.temp_dir, f\"batch_{batch_id}.parquet\")\n", + " table = pa.Table.from_pandas(data)\n", + " pq.write_table(table, file_path)\n", + " \n", + " # Store file path in SQLite\n", + " self.conn.execute(\"INSERT INTO temp_data (batch_id, data) VALUES (?, ?)\",\n", + " (batch_id, file_path))\n", + " self.conn.commit()\n", + "\n", + " async def get_data(self, batch_id: str) -> pd.DataFrame:\n", + " cursor = self.conn.execute(\"SELECT data FROM temp_data WHERE batch_id = ?\", (batch_id,))\n", + " result = cursor.fetchone()\n", + " if result:\n", + " data = result[0]\n", + " if data.startswith('{'): # JSON data\n", + " return pd.read_json(data)\n", + " else: # File path\n", + " return pd.read_parquet(data)\n", + " return None\n", + "\n", + " def cleanup(self):\n", + " self.conn.close()\n", + " for file in os.listdir(self.temp_dir):\n", + " os.remove(os.path.join(self.temp_dir, file))\n", + " os.rmdir(self.temp_dir)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def get_historical_aggregated_values_batch_time_vars_data_async(\n", + " self,\n", + " start_time: datetime,\n", + " end_time: datetime,\n", + " pro_interval: int,\n", + " agg_name: str,\n", + " variable_list: List[str],\n", + " max_data_points: int = 1000,\n", + " max_retries: int = 3,\n", + " retry_delay: int = 5,\n", + " max_concurrent_requests: int = 10\n", + ") -> pd.DataFrame:\n", + " logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n", + " logger = logging.getLogger(__name__)\n", + "\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in variable_list]\n", + " total_time_range_ms = (end_time - start_time).total_seconds() * 1000\n", + " estimated_intervals = total_time_range_ms / pro_interval\n", + " max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))\n", + " variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]\n", + " max_time_batches = max(1, int(estimated_intervals / max_data_points))\n", + " time_batch_size_ms = total_time_range_ms / max_time_batches\n", + "\n", + " all_results = []\n", + " semaphore = Semaphore(max_concurrent_requests)\n", + " client_pool = ClientPool(max_concurrent_requests, self.rest_url, self.headers)\n", + " data_handler = DataHandler()\n", + "\n", + " async def process_batch(vid, variables, time_batch):\n", + " async with semaphore:\n", + " batch_start_ms = time_batch * time_batch_size_ms\n", + " batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)\n", + " batch_start = start_time + timedelta(milliseconds=batch_start_ms)\n", + " batch_end = start_time + timedelta(milliseconds=batch_end_ms)\n", + "\n", + " body = {\n", + " **self.body,\n", + " \"StartTime\": batch_start.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"EndTime\": batch_end.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"ProcessingInterval\": pro_interval,\n", + " \"ReadValueIds\": variables,\n", + " \"AggregateName\": agg_name\n", + " }\n", + "\n", + " for attempt in range(max_retries):\n", + " try:\n", + " content = await request_from_api_async(\n", + " client_pool,\n", + " method=\"POST\",\n", + " endpoint=f\"/values/historicalaggregated\",\n", + " data=json.dumps(body, default=self.json_serial),\n", + " extended_timeout=True\n", + " )\n", + " break\n", + " except (aiohttp.ClientError, ValidationError) as e:\n", + " if attempt < max_retries - 1:\n", + " wait_time = retry_delay * (2 ** attempt)\n", + " logger.warning(f\"Request failed. Retrying in {wait_time} seconds...\")\n", + " await asyncio.sleep(wait_time)\n", + " else:\n", + " logger.error(f\"Max retries reached. Error: {e}\")\n", + " raise RuntimeError(f'Error message {e}')\n", + "\n", + " self._check_content(content)\n", + "\n", + " df_result = pd.json_normalize(\n", + " content, \n", + " record_path=['HistoryReadResults', 'DataValues'], \n", + " meta=[['HistoryReadResults', 'NodeId', 'IdType'], \n", + " ['HistoryReadResults', 'NodeId','Id'],\n", + " ['HistoryReadResults', 'NodeId','Namespace']]\n", + " )\n", + " batch_id = f\"{time_batch}_{vid}\"\n", + " await data_handler.save_data(batch_id, df_result)\n", + " return batch_id\n", + "\n", + " tasks = [\n", + " process_batch(vid,variables, time_batch)\n", + " for vid,variables in enumerate(variable_batches)\n", + " for time_batch in range(max_time_batches)\n", + " ]\n", + "\n", + " try:\n", + " batch_ids = await asyncio.gather(*tasks)\n", + " # for batch_id in batch_ids:\n", + " # df = await data_handler.get_data(batch_id)\n", + " # all_results.append(df)\n", + "\n", + " # logger.info(\"Combining all batches...\")\n", + " # combined_df = pd.concat(all_results, ignore_index=True)\n", + " # columns = {\n", + " # \"Value.Type\": \"ValueType\",\n", + " # \"Value.Body\": \"Value\",\n", + " # \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " # \"StatusCode.Code\": \"StatusCode\",\n", + " # \"SourceTimestamp\": \"Timestamp\",\n", + " # \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " # \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " # \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " # }\n", + " # return self._process_df(combined_df, columns)\n", + " finally:\n", + " await client_pool.close_all()\n", + " data_handler.cleanup()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1 day aggregated historical data\n", + "one_day_historical_data = await get_historical_aggregated_values_batch_time_vars_data_async(\n", + " opc_data,\n", + " start_time=start_time,\n", + " end_time=end_time,\n", + " pro_interval=pro_interval,\n", + " agg_name=agg_name,\n", + " variable_list=variable_list,\n", + " max_data_points=20000,\n", + " max_concurrent_requests=50\n", + ")\n", + "one_day_historical_data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Async with parquet data handler for large data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "import aiohttp\n", + "import pandas as pd\n", + "import pyarrow as pa\n", + "import pyarrow.parquet as pq\n", + "from datetime import datetime, timedelta\n", + "import json\n", + "from typing import List, Dict, Any\n", + "import logging\n", + "from asyncio import Semaphore\n", + "from aiohttp import TCPConnector\n", + "from tenacity import retry, stop_after_attempt, wait_exponential\n", + "from concurrent.futures import ThreadPoolExecutor\n", + "\n", + "import tracemalloc\n", + "tracemalloc.start()\n", + "\n", + "logger = logging.getLogger(__name__)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class AsyncParquetWriter:\n", + " def __init__(self, filename):\n", + " self.filename = filename\n", + " self.writer = None\n", + " self.executor = ThreadPoolExecutor(max_workers=10)\n", + "\n", + " async def write(self, df):\n", + " loop = asyncio.get_running_loop()\n", + " table = pa.Table.from_pandas(df)\n", + " if self.writer is None:\n", + " self.writer = pq.ParquetWriter(self.filename, table.schema)\n", + " await loop.run_in_executor(self.executor, self.writer.write_table, table)\n", + "\n", + " async def close(self):\n", + " if self.writer:\n", + " loop = asyncio.get_running_loop()\n", + " await loop.run_in_executor(self.executor, self.writer.close)\n", + " self.writer = None\n", + "\n", + "class DataHandler:\n", + " def __init__(self, base_path):\n", + " self.base_path = base_path\n", + " self.writers = {}\n", + "\n", + " async def save_data(self, batch_id: str, data: pd.DataFrame):\n", + " if batch_id not in self.writers:\n", + " self.writers[batch_id] = AsyncParquetWriter(f\"{self.base_path}/batch_{batch_id}.parquet\")\n", + " await self.writers[batch_id].write(data)\n", + "\n", + " async def close_all(self):\n", + " for writer in self.writers.values():\n", + " await writer.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def get_historical_aggregated_values_batch_time_vars_data_async_parquet(\n", + " self,\n", + " start_time: datetime,\n", + " end_time: datetime,\n", + " pro_interval: int,\n", + " agg_name: str,\n", + " variable_list: List[str],\n", + " max_data_points: int = 100000,\n", + " max_retries: int = 3,\n", + " retry_delay: int = 5,\n", + " max_concurrent_requests: int = 50\n", + ") -> pd.DataFrame:\n", + " logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n", + " logger = logging.getLogger(__name__)\n", + "\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in variable_list]\n", + " total_time_range_ms = (end_time - start_time).total_seconds() * 1000\n", + " estimated_intervals = total_time_range_ms / pro_interval\n", + " max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))\n", + " variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]\n", + " max_time_batches = max(1, int(estimated_intervals / max_data_points))\n", + " time_batch_size_ms = total_time_range_ms / max_time_batches\n", + "\n", + " all_results = []\n", + " semaphore = Semaphore(max_concurrent_requests)\n", + " client_pool = ClientPool(max_concurrent_requests, self.rest_url, self.headers)\n", + " data_handler = DataHandler(base_path=\"pqfiles\")\n", + "\n", + " async def process_batch(vid, variables, time_batch):\n", + " async with semaphore:\n", + " batch_start_ms = time_batch * time_batch_size_ms\n", + " batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)\n", + " batch_start = start_time + timedelta(milliseconds=batch_start_ms)\n", + " batch_end = start_time + timedelta(milliseconds=batch_end_ms)\n", + "\n", + " body = {\n", + " **self.body,\n", + " \"StartTime\": batch_start.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"EndTime\": batch_end.strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n", + " \"ProcessingInterval\": pro_interval,\n", + " \"ReadValueIds\": variables,\n", + " \"AggregateName\": agg_name\n", + " }\n", + "\n", + " for attempt in range(max_retries):\n", + " try:\n", + " content = await request_from_api_async(\n", + " client_pool,\n", + " method=\"POST\",\n", + " endpoint=f\"/values/historicalaggregated\",\n", + " data=json.dumps(body, default=self.json_serial),\n", + " extended_timeout=True\n", + " )\n", + " break\n", + " except (aiohttp.ClientError, ValidationError) as e:\n", + " if attempt < max_retries - 1:\n", + " wait_time = retry_delay * (2 ** attempt)\n", + " logger.warning(f\"Request failed. Retrying in {wait_time} seconds...\")\n", + " await asyncio.sleep(wait_time)\n", + " else:\n", + " logger.error(f\"Max retries reached. Error: {e}\")\n", + " raise RuntimeError(f'Error message {e}')\n", + "\n", + " self._check_content(content)\n", + "\n", + " df_result = pd.json_normalize(\n", + " content, \n", + " record_path=['HistoryReadResults', 'DataValues'], \n", + " meta=[['HistoryReadResults', 'NodeId', 'IdType'], \n", + " ['HistoryReadResults', 'NodeId','Id'],\n", + " ['HistoryReadResults', 'NodeId','Namespace']]\n", + " )\n", + " batch_id = f\"{time_batch}_{vid}\"\n", + " await data_handler.save_data(batch_id, df_result)\n", + " return batch_id\n", + "\n", + " tasks = [\n", + " process_batch(vid,variables, time_batch)\n", + " for vid,variables in enumerate(variable_batches)\n", + " for time_batch in range(max_time_batches)\n", + " ]\n", + "\n", + " try:\n", + " batch_ids = await asyncio.gather(*tasks)\n", + " # for batch_id in batch_ids:\n", + " # df = await data_handler.get_data(batch_id)\n", + " # all_results.append(df)\n", + "\n", + " # logger.info(\"Combining all batches...\")\n", + " # combined_df = pd.concat(all_results, ignore_index=True)\n", + " # columns = {\n", + " # \"Value.Type\": \"ValueType\",\n", + " # \"Value.Body\": \"Value\",\n", + " # \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " # \"StatusCode.Code\": \"StatusCode\",\n", + " # \"SourceTimestamp\": \"Timestamp\",\n", + " # \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " # \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " # \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " # }\n", + " # return self._process_df(combined_df, columns)\n", + " finally:\n", + " await client_pool.close_all()\n", + " await data_handler.close_all()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1 day aggregated historical data\n", + "one_day_historical_data = await get_historical_aggregated_values_batch_time_vars_data_async_parquet(\n", + " opc_data,\n", + " start_time=datetime(2024,6,1,00,00),\n", + " end_time=datetime(2024,6,2,00,00),\n", + " pro_interval=pro_interval,\n", + " agg_name=agg_name,\n", + " variable_list=variable_list,\n", + " max_data_points=50000,\n", + " max_concurrent_requests=50\n", + ")\n", + "one_day_historical_data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Stringset data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_historical_aggregated_values(opc_data,\n", + " start_time, \n", + " end_time, \n", + " pro_interval, \n", + " agg_name, \n", + " variable_list\n", + ") -> pd.DataFrame:\n", + " \n", + " vars = opc_data._get_variable_list_as_list(variable_list)\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in vars]\n", + "\n", + " body = {\n", + " **opc_data.body, \n", + " \"StartTime\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"), \n", + " \"EndTime\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"), \n", + " \"ProcessingInterval\": pro_interval, \n", + " \"AggregateName\": agg_name,\n", + " \"ReadValueIds\": extended_variables\n", + " }\n", + " print(body)\n", + "\n", + " content = request_from_api(\n", + " rest_url=opcua_rest_url, \n", + " method=\"POST\", \n", + " endpoint=\"values/historicalaggregated\", \n", + " data=json.dumps(body, default=opc_data.json_serial), \n", + " headers=opc_data.headers, \n", + " extended_timeout=True\n", + " )\n", + " print(content)\n", + " df_result = pd.json_normalize(\n", + " content, \n", + " record_path=['HistoryReadResults', 'DataValues'], \n", + " meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId','Id'],['HistoryReadResults', 'NodeId','Namespace']\n", + " ]\n", + " )\n", + " columns = {\n", + " \"Value.Type\": \"ValueType\",\n", + " \"Value.Body\": \"Value\",\n", + " \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " \"StatusCode.Code\": \"StatusCode\",\n", + " \"SourceTimestamp\": \"Timestamp\",\n", + " \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " }\n", + " return opc_data._process_df(df_result, columns)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "start_time=(datetime.datetime.now() - datetime.timedelta(30))\n", + "end_time=(datetime.datetime.now() - datetime.timedelta(29))\n", + "pro_interval=600000\n", + "agg_name=\"Average\"\n", + "variable_list=string_sets.variables_as_list([\"DCPower\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_historical_aggregated_values(opc_data,\n", + " start_time, \n", + " end_time, \n", + " pro_interval, \n", + " agg_name, \n", + " variable_list) -> pd.DataFrame:\n", + " vars = opc_data._get_variable_list_as_list(variable_list)\n", + " batch_size = 100\n", + " batches = [vars[i:i + batch_size] for i in range(0, len(vars), batch_size)]\n", + " \n", + " combined_df = pd.DataFrame() \n", + " \n", + " for batch in batches:\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in batch]\n", + " \n", + " body = {\n", + " **opc_data.body, \n", + " \"StartTime\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"), \n", + " \"EndTime\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"), \n", + " \"ProcessingInterval\": pro_interval, \n", + " \"AggregateName\": agg_name,\n", + " \"ReadValueIds\": extended_variables\n", + " }\n", + " \n", + " content = request_from_api(\n", + " rest_url=opcua_rest_url, \n", + " method=\"POST\", \n", + " endpoint=\"values/historicalaggregated\", \n", + " data=json.dumps(body, default=opc_data.json_serial), \n", + " headers=opc_data.headers, \n", + " extended_timeout=True\n", + " )\n", + " \n", + " df_result = pd.json_normalize(\n", + " content, \n", + " record_path=['HistoryReadResults', 'DataValues'], \n", + " meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId','Id'],['HistoryReadResults', 'NodeId','Namespace']]\n", + " )\n", + " \n", + " if combined_df.empty:\n", + " combined_df = df_result\n", + " else:\n", + " combined_df = pd.concat([combined_df, df_result], ignore_index=True)\n", + " \n", + " columns = {\n", + " \"Value.Type\": \"ValueType\",\n", + " \"Value.Body\": \"Value\",\n", + " \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " \"StatusCode.Code\": \"StatusCode\",\n", + " \"SourceTimestamp\": \"Timestamp\",\n", + " \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " }\n", + " \n", + " return opc_data._process_df(combined_df, columns)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "get_historical_aggregated_values(opc_data,\n", + " start_time, \n", + " end_time, \n", + " pro_interval, \n", + " agg_name, \n", + " variable_list)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import hashlib\n", + "import concurrent.futures" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_historical_aggregated_values(opc_data, start_time, end_time, pro_interval, agg_name, variable_list) -> pd.DataFrame:\n", + " vars = opc_data._get_variable_list_as_list(variable_list)\n", + " batch_size = 150\n", + " batches = [vars[i:i + batch_size] for i in range(0, len(vars), batch_size)]\n", + "\n", + " def process_batch(batch):\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in batch]\n", + " body = {\n", + " **opc_data.body,\n", + " \"StartTime\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n", + " \"EndTime\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n", + " \"ProcessingInterval\": pro_interval,\n", + " \"AggregateName\": agg_name,\n", + " \"ReadValueIds\": extended_variables\n", + " }\n", + " content = request_from_api(\n", + " rest_url=opcua_rest_url,\n", + " method=\"POST\",\n", + " endpoint=\"values/historicalaggregated\",\n", + " data=json.dumps(body, default=opc_data.json_serial),\n", + " headers=opc_data.headers,\n", + " extended_timeout=True\n", + " )\n", + " return pd.json_normalize(\n", + " content,\n", + " record_path=['HistoryReadResults', 'DataValues'],\n", + " meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId', 'Id'], ['HistoryReadResults', 'NodeId', 'Namespace']]\n", + " )\n", + "\n", + " dataframes = []\n", + " with concurrent.futures.ThreadPoolExecutor() as executor:\n", + " future_to_batch = {executor.submit(process_batch, batch): batch for batch in batches}\n", + " for future in concurrent.futures.as_completed(future_to_batch):\n", + " dataframes.append(future.result())\n", + "\n", + " combined_df = pd.concat(dataframes, ignore_index=True) if dataframes else pd.DataFrame()\n", + "\n", + " columns = {\n", + " \"Value.Type\": \"ValueType\",\n", + " \"Value.Body\": \"Value\",\n", + " \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " \"StatusCode.Code\": \"StatusCode\",\n", + " \"SourceTimestamp\": \"Timestamp\",\n", + " \"HistoryReadResults.NodeId.IdType\": \"IdType\",\n", + " \"HistoryReadResults.NodeId.Id\": \"Id\",\n", + " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " }\n", + "\n", + " return opc_data._process_df(combined_df, columns)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "vars = opc_data._get_variable_list_as_list(variable_list)\n", + "extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in vars]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "body = {\n", + " **opc_data.body,\n", + " \"StartTime\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n", + " \"EndTime\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n", + " \"ProcessingInterval\": pro_interval,\n", + " \"AggregateName\": agg_name,\n", + " \"ReadValueIds\": extended_variables\n", + "}\n", + "body" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "get_historical_aggregated_values(opc_data,\n", + " start_time, \n", + " end_time, \n", + " pro_interval, \n", + " agg_name, \n", + " variable_list)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "start_time = datetime.now() - relativedelta(months=1)\n", + "end_time = datetime.now()\n", + "get_historical_aggregated_values(opc_data,\n", + " start_time, \n", + " end_time, \n", + " pro_interval, \n", + " agg_name, \n", + " variable_list)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# History data for 1 day, 10 min aggregate - stringsets\n", + "history_agg = opc_data.get_historical_aggregated_values(\n", + " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", + " end_time=(datetime.datetime.now() - datetime.timedelta(29)),\n", + " pro_interval=600000,\n", + " agg_name=\"Average\",\n", + " variable_list=inverters.variables_as_list([\"DCPower\"]),\n", + ")\n", + "history_agg" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import copy\n", + "import math\n", + "from pydantic import BaseModel, AnyUrl\n", + "from datetime import timedelta\n", + "import asyncio\n", + "import aiohttp" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class Variables(BaseModel):\n", + " \"\"\"Helper class to parse all values api's.\n", + " Variables are described in https://reference.opcfoundation.org/v104/Core/docs/Part3/8.2.1/\n", + "\n", + " Variables:\n", + " Id: str - Id of the signal, e.g. SSO.EG-AS.WeatherSymbol\n", + " Namespace: int - Namespace on the signal, e.g. 2.\n", + " IdType: int - IdTypes described in https://reference.opcfoundation.org/v104/Core/docs/Part3/8.2.3/.\n", + " \"\"\"\n", + " Id: str\n", + " Namespace: int\n", + " IdType: int" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def make_async_api_request(opc_data, start_time: datetime, end_time: datetime, pro_interval: int, agg_name: str, variable_list: list[Variables]) -> dict:\n", + " \"\"\"Make API request for the given time range and variable list\"\"\"\n", + "\n", + " # Creating a new variable list to remove pydantic models\n", + " vars = opc_data._get_variable_list_as_list(variable_list)\n", + "\n", + " extended_variables = [\n", + " {\n", + " \"NodeId\": var,\n", + " \"AggregateName\": agg_name,\n", + " }\n", + " for var in vars\n", + " ]\n", + " body = copy.deepcopy(opc_data.body)\n", + " body[\"StartTime\"] = start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + " body[\"EndTime\"] = end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + " body[\"ProcessingInterval\"] = pro_interval\n", + " body[\"ReadValueIds\"] = extended_variables\n", + " body[\"AggregateName\"] = agg_name\n", + "\n", + " # Make API request using aiohttp session\n", + " async with aiohttp.ClientSession() as session:\n", + " async with session.post(\n", + " f\"{opcua_rest_url}values/historicalaggregated\",\n", + " data=json.dumps(body, default=opc_data.json_serial),\n", + " headers=opc_data.headers,\n", + " timeout=aiohttp.ClientTimeout(total=None) \n", + " ) as response:\n", + " response.raise_for_status()\n", + " content = await response.json()\n", + "\n", + " return content" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "vars = opc_data._get_variable_list_as_list(variable_list)\n", + "vars1 = vars[0:5]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "extended_variables = [\n", + " {\n", + " \"NodeId\": var,\n", + " \"AggregateName\": agg_name,\n", + " }\n", + " for var in vars1\n", + "]\n", + "len(extended_variables)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "body = copy.deepcopy(opc_data.body)\n", + "body[\"StartTime\"] = start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + "body[\"EndTime\"] = end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + "body[\"ProcessingInterval\"] = pro_interval\n", + "body[\"ReadValueIds\"] = extended_variables\n", + "body[\"AggregateName\"] = agg_name\n", + "body" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "f\"{opcua_rest_url}values/historicalaggregated\"," + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data=json.dumps(body, default=opc_data.json_serial)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data_dict = json.loads(data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "read_value_ids = data_dict['ReadValueIds']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "len(read_value_ids)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "headers=opc_data.headers\n", + "headers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "timeout=aiohttp.ClientTimeout(total=None) \n", + "timeout" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async with aiohttp.ClientSession() as session:\n", + " async with session.post(\n", + " f\"{opcua_rest_url}values/historicalaggregated\",\n", + " data=json.dumps(body, default=opc_data.json_serial),\n", + " headers=opc_data.headers,\n", + " timeout=aiohttp.ClientTimeout(total=None) \n", + " ) as response:\n", + " response.raise_for_status()\n", + " content = await response.json()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "content" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def generate_time_batches(start_time: datetime, end_time: datetime, pro_interval: int, batch_size: int) -> list[tuple]:\n", + " \"\"\"Generate time batches based on start time, end time, processing interval, and batch size\"\"\"\n", + "\n", + " total_time_range = end_time - start_time\n", + " pro_interval_seconds = (pro_interval / 1000)\n", + " total_data_points = (total_time_range.total_seconds() // pro_interval_seconds) + 1\n", + "\n", + " total_batches = math.ceil(total_data_points / batch_size)\n", + " actual_batch_size = math.ceil(total_data_points / total_batches)\n", + "\n", + " time_batches = [\n", + " (start_time + timedelta(seconds=(i * actual_batch_size * pro_interval_seconds)),\n", + " start_time + timedelta(seconds=((i + 1) * actual_batch_size * pro_interval_seconds)) - timedelta(seconds=pro_interval_seconds))\n", + " for i in range(total_batches)\n", + " ]\n", + "\n", + " return time_batches" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def generate_variable_batches(variable_list: list[Variables], batch_size: int) -> list[list[Variables]]:\n", + " \"\"\"Generate variable batches based on the variable list and batch size\"\"\"\n", + "\n", + " variable_batches = [\n", + " variable_list[i:i + batch_size] for i in range(0, len(variable_list), batch_size)\n", + " ]\n", + "\n", + " return variable_batches" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def process_api_response(opc_data, response: dict) -> pd.DataFrame:\n", + " \"\"\"Process the API response and return the result dataframe\"\"\"\n", + " \n", + " df_result = pd.json_normalize(response, record_path=['HistoryReadResults', 'DataValues'], \n", + " meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId','Id'],\n", + " ['HistoryReadResults', 'NodeId','Namespace']] )\n", + "\n", + " for i, row in df_result.iterrows():\n", + " if not math.isnan(row[\"Value.Type\"]):\n", + " value_type = opc_data._get_value_type(int(row[\"Value.Type\"])).get(\"type\")\n", + " df_result.at[i, \"Value.Type\"] = str(value_type)\n", + "\n", + " df_result.rename(\n", + " columns={\n", + " \"Value.Type\": \"ValueType\",\n", + " \"Value.Body\": \"Value\",\n", + " \"StatusCode.Symbol\": \"StatusSymbol\",\n", + " \"StatusCode.Code\": \"StatusCode\",\n", + " \"SourceTimestamp\": \"Timestamp\",\n", + " \"HistoryReadResults.NodeId.IdType\": \"Id\",\n", + " \"HistoryReadResults.NodeId.Namespace\": \"Namespace\",\n", + " },\n", + " errors=\"raise\",\n", + " inplace=True,\n", + " )\n", + "\n", + " return df_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def get_historical_aggregated_values_async(\n", + " opc_data,\n", + " start_time: datetime,\n", + " end_time: datetime,\n", + " pro_interval: int,\n", + " agg_name: str,\n", + " variable_list: list[Variables],\n", + " batch_size: int = 1000\n", + ") -> pd.DataFrame:\n", + " \"\"\"Request historical aggregated values from the OPC UA server with batching\"\"\"\n", + "\n", + " \n", + " time_batches = generate_time_batches(start_time, end_time, pro_interval, batch_size)\n", + " variable_batches = generate_variable_batches(variable_list, batch_size)\n", + "\n", + " # Creating tasks for each API request and gathering the results\n", + " tasks = []\n", + "\n", + " for time_batch_start, time_batch_end in time_batches:\n", + " for variable_sublist in variable_batches:\n", + " task = asyncio.create_task(\n", + " make_async_api_request(opc_data, time_batch_start, time_batch_end, pro_interval, agg_name, variable_sublist)\n", + " ) \n", + " tasks.append(task)\n", + " \n", + " # Execute all tasks concurrently and gather their results\n", + " responses = await asyncio.gather(*tasks)\n", + " \n", + " # Processing the API responses\n", + " result_list = []\n", + " for idx, batch_response in enumerate(responses):\n", + " \n", + " batch_result = process_api_response(opc_data, batch_response)\n", + " result_list.append(batch_result)\n", + " \n", + " result_df = pd.concat(result_list, ignore_index=True)\n", + "\n", + " return result_df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1 day aggregated historical inverter data in asyncio process\n", + "one_days_historic_inverter_data2 = await get_historical_aggregated_values_async(\n", + " opc_data,\n", + " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", + " end_time=(datetime.datetime.now() - datetime.timedelta(29)),\n", + " pro_interval=60*1000,\n", + " agg_name=\"Average\",\n", + " variable_list=string_sets.variables_as_list([\"DCPower\"]),\n", + " batch_size=100\n", + ")\n", + "one_days_historic_inverter_data2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def generate_time_chunks(start_time: datetime, end_time: datetime):\n", + " \"\"\"Generate time chunks between start_time and end_time, each chunk_duration_minutes long.\"\"\"\n", + " delta = timedelta(minutes=60)\n", + " current_time = start_time\n", + " while current_time < end_time:\n", + " chunk_end_time = min(current_time + delta, end_time)\n", + " yield (current_time, chunk_end_time)\n", + " current_time = chunk_end_time" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def make_async_api_request(opc_data, start_time: datetime, end_time: datetime, pro_interval: int, agg_name: str, variable_list: list[Variables], max_data_points=500) -> dict:\n", + " \"\"\"Make API request for the given time range and variable list, with additional chunking based on data points.\"\"\"\n", + "\n", + " def chunk_list(lst, n):\n", + " \"\"\"Yield successive n-sized chunks from lst.\"\"\"\n", + " for i in range(0, len(lst), n):\n", + " yield lst[i:i + n]\n", + "\n", + " async def fetch_data_for_time_period(session, vars_chunk, start, end):\n", + " \"\"\"Fetch data for a given time period and chunk of variables.\"\"\"\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in vars_chunk]\n", + " body = copy.deepcopy(opc_data.body)\n", + " body[\"StartTime\"] = start.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + " body[\"EndTime\"] = end.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + " body[\"ProcessingInterval\"] = pro_interval\n", + " body[\"ReadValueIds\"] = extended_variables\n", + " body[\"AggregateName\"] = agg_name\n", + "\n", + " async with session.post(\n", + " f\"{opcua_rest_url}values/historicalaggregated\",\n", + " data=json.dumps(body, default=str),\n", + " headers=opc_data.headers,\n", + " timeout=aiohttp.ClientTimeout(total=None)\n", + " ) as response:\n", + " response.raise_for_status()\n", + " return await response.json()\n", + "\n", + " # Creating a new variable list to remove pydantic models\n", + " vars = opc_data._get_variable_list_as_list(variable_list)\n", + " chunk_size = 5 # Chunk size for node IDs\n", + " vars_chunks = list(chunk_list(vars, chunk_size))\n", + "\n", + " all_responses = []\n", + " async with aiohttp.ClientSession() as session:\n", + " for vars_chunk in vars_chunks:\n", + " # Generate time chunks for the given time period\n", + " async for start, end in generate_time_chunks(start_time, end_time):\n", + " content = await fetch_data_for_time_period(session, vars_chunk, start, end)\n", + " all_responses.append(content)\n", + " return all_responses" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def make_async_api_request(opc_data, start_time: datetime, end_time: datetime, pro_interval: int, agg_name: str, variable_list: list[Variables]) -> dict:\n", + " \"\"\"Make API request for the given time range and variable list\"\"\"\n", + "\n", + " def chunk_list(lst, n):\n", + " for i in range(0, len(lst), n):\n", + " yield lst[i:i + n]\n", + "\n", + " # Creating a new variable list to remove pydantic models\n", + " vars = opc_data._get_variable_list_as_list(variable_list)\n", + "\n", + " chunk_size = 150 \n", + " vars_chunks = list(chunk_list(vars, chunk_size))\n", + "\n", + " all_responses = []\n", + " async with aiohttp.ClientSession() as session:\n", + " for vars_chunk in vars_chunks:\n", + " extended_variables = [{\"NodeId\": var, \"AggregateName\": agg_name} for var in vars_chunk]\n", + " body = copy.deepcopy(opc_data.body)\n", + " body[\"StartTime\"] = start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + " body[\"EndTime\"] = end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + " body[\"ProcessingInterval\"] = pro_interval\n", + " body[\"ReadValueIds\"] = extended_variables\n", + " body[\"AggregateName\"] = agg_name\n", + "\n", + " async with session.post(\n", + " f\"{opcua_rest_url}values/historicalaggregated\",\n", + " data=json.dumps(body, default=str),\n", + " headers=opc_data.headers,\n", + " timeout=aiohttp.ClientTimeout(total=None)\n", + " ) as response:\n", + " response.raise_for_status()\n", + " content = await response.json()\n", + " all_responses.append(content) \n", + "\n", + " return all_responses" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime, timedelta\n", + "from typing import List, Tuple" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def generate_time_chunks(start_time: datetime, end_time: datetime, interval_hours: int) -> List[Tuple[datetime, datetime]]:\n", + " \"\"\"Generate time chunks within the given start and end time with specified interval in hours.\"\"\"\n", + " delta = timedelta(hours=interval_hours)\n", + " current_time = start_time\n", + " chunks = []\n", + "\n", + " while current_time < end_time:\n", + " chunk_end_time = min(current_time + delta, end_time) \n", + " chunks.append((current_time, chunk_end_time))\n", + " current_time += delta\n", + "\n", + " return chunks" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1 day aggregated historical inverter data in asyncio process\n", + "one_days_historic_inverter_data2 = await make_async_api_request(\n", + " opc_data,\n", + " start_time=(datetime.datetime.now() - datetime.timedelta(30)),\n", + " end_time=(datetime.datetime.now() - datetime.timedelta(29)),\n", + " pro_interval=60*1000,\n", + " agg_name=\"Average\",\n", + " variable_list=string_sets.variables_as_list([\"DCPower\"])\n", + ")\n", + "one_days_historic_inverter_data2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.12.1 64-bit", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + }, + "orig_nbformat": 4, + "vscode": { + "interpreter": { + "hash": "6b866f0bc560289bf4bb2415ae9074243764eb008c10d00a1da29433677418de" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/requirements.txt b/requirements.txt index 664a1fd..1a91227 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,9 @@ +nest_asyncio sphinx_rtd_theme tox ipykernel --e . -python-dotenv \ No newline at end of file +tqdm +tenacity +pytest-asyncio +python-dotenv +-e . \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 275de38..cc107ee 100644 --- a/setup.cfg +++ b/setup.cfg @@ -71,6 +71,7 @@ testing = setuptools pytest pytest-cov + nest_asyncio pyPrediktorUtilities >= 0.4.1 pyodbc diff --git a/src/pyprediktormapclient/auth_client.py b/src/pyprediktormapclient/auth_client.py index c961a01..b68ddc4 100644 --- a/src/pyprediktormapclient/auth_client.py +++ b/src/pyprediktormapclient/auth_client.py @@ -118,7 +118,7 @@ def check_if_token_has_expired(self) -> bool: if self.token is None or self.token.expires_at is None: return True - return datetime.datetime.utcnow() > self.token.expires_at + return datetime.datetime.now(datetime.timezone.utc) > self.token.expires_at def request_new_ory_token(self) -> None: """Request Ory token diff --git a/src/pyprediktormapclient/opc_ua.py b/src/pyprediktormapclient/opc_ua.py index 1a8977f..0dedce8 100644 --- a/src/pyprediktormapclient/opc_ua.py +++ b/src/pyprediktormapclient/opc_ua.py @@ -1,18 +1,21 @@ import json -import math import logging -import datetime import copy import pandas as pd +import requests from datetime import date, datetime, timedelta -from typing import Dict, List, Any, Union, Optional -from pydantic import BaseModel, AnyUrl, validate_call +from typing import Dict, List, Any, Union, Optional, Callable +from pydantic import BaseModel, AnyUrl from pydantic_core import Url from pyprediktormapclient.shared import request_from_api from requests import HTTPError import asyncio -import requests import aiohttp +from aiohttp import ClientSession +from asyncio import Semaphore +import nest_asyncio + +nest_asyncio.apply() logger = logging.getLogger(__name__) @@ -122,6 +125,13 @@ class WriteReturn(BaseModel): TimeStamp: str Success: bool + +class AsyncIONotebookHelper: + @staticmethod + def run_coroutine(coroutine): + loop = asyncio.get_event_loop() + return loop.run_until_complete(coroutine) + class Config: arbitrary_types_allowed = True @@ -144,7 +154,6 @@ class OPC_UA: class Config: arbitrary_types_allowed = True - def __init__(self, rest_url: AnyUrl, opcua_url: AnyUrl, namespaces: List = None, auth_client: object = None, session: requests.Session = None): """Class initializer @@ -165,6 +174,7 @@ def __init__(self, rest_url: AnyUrl, opcua_url: AnyUrl, namespaces: List = None, } self.auth_client = auth_client self.session = session + self.helper = AsyncIONotebookHelper() if self.auth_client is not None: if self.auth_client.token is not None: @@ -335,126 +345,178 @@ def _process_df(self, df_result: pd.DataFrame, columns: Dict[str, str]) -> pd.Da return df_result - def _prepare_body(self, start_time: datetime, end_time: datetime, variables: List[Dict[str, str]], additional_params: Dict = None) -> Dict: - """ - Prepare the request body for the API call. - """ - body = { - **self.body, - "StartTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), - "EndTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"), - "ReadValueIds": variables - } - if additional_params: - body.update(additional_params) - return body - - def _fetch_data(self, endpoint: str, body: Dict) -> pd.DataFrame: - """ - Fetch data from the API and return it as a DataFrame. - """ - try: - content = request_from_api( - rest_url=self.rest_url, - method="POST", - endpoint=endpoint, - data=json.dumps(body, default=self.json_serial), - headers=self.headers, - extended_timeout=True - ) - except HTTPError as e: - if self.auth_client is not None: - self.check_auth_client(json.loads(e.response.content)) - else: - raise RuntimeError(f'Error message {e}') + async def _make_request(self, endpoint: str, body: dict, max_retries: int, retry_delay: int): + for attempt in range(max_retries): + try: + logging.info(f"Attempt {attempt + 1} of {max_retries}") + async with ClientSession() as session: + url = f"{self.rest_url}{endpoint}" + logging.info(f"Making POST request to {url}") + logging.debug(f"Request body: {body}") + logging.debug(f"Request headers: {self.headers}") + + async with session.post(url, json=body, headers=self.headers) as response: + logging.info(f"Response received: Status {response.status}") + + if response.status >= 400: + error_text = await response.text() + logging.error(f"HTTP error {response.status}: {error_text}") + response.raise_for_status() + + return await response.json() + + except aiohttp.ClientResponseError as e: + if e.status == 500: + logging.error(f"Server Error: {e}") + raise # For 500 errors, we might want to fail fast + logging.error(f"ClientResponseError: {e}") + except aiohttp.ClientError as e: + logging.error(f"ClientError in POST request: {e}") + except Exception as e: + logging.error(f"Unexpected error in _make_request: {e}") + + if attempt < max_retries - 1: + wait_time = retry_delay * (2 ** attempt) + logging.warning(f"Request failed. Retrying in {wait_time} seconds...") + await asyncio.sleep(wait_time) + else: + logging.error(f"Max retries reached.") + raise RuntimeError('Max retries reached') + + def _process_content(self, content: dict) -> pd.DataFrame: self._check_content(content) + df_list = [] + for item in content["HistoryReadResults"]: + df = pd.json_normalize(item["DataValues"]) + for key, value in item["NodeId"].items(): + df[f"HistoryReadResults.NodeId.{key}"] = value + df_list.append(df) + + if df_list: + df_result = pd.concat(df_list) + df_result.reset_index(inplace=True, drop=True) + return df_result + + async def get_historical_values( + self, + start_time: datetime, + end_time: datetime, + variable_list: List[str], + endpoint: str, + prepare_variables: Callable[[List[str]], List[dict]], + additional_params: dict = None, + max_data_points: int = 10000, + max_retries: int = 3, + retry_delay: int = 5, + max_concurrent_requests: int = 30 + ) -> pd.DataFrame: + """Generic method to request historical values from the OPC UA server with batching""" + total_time_range_ms = (end_time - start_time).total_seconds() * 1000 + estimated_intervals = total_time_range_ms / max_data_points + + max_variables_per_batch = max(1, int(max_data_points / estimated_intervals)) + max_time_batches = max(1, int(estimated_intervals / max_data_points)) + time_batch_size_ms = total_time_range_ms / max_time_batches + + extended_variables = prepare_variables(variable_list) + variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)] + + semaphore = Semaphore(max_concurrent_requests) + + async def process_batch(variables, time_batch): + async with semaphore: + batch_start_ms = time_batch * time_batch_size_ms + batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms) + batch_start = start_time + timedelta(milliseconds=batch_start_ms) + batch_end = start_time + timedelta(milliseconds=batch_end_ms) + + body = { + **self.body, + "StartTime": batch_start.isoformat() + "Z", + "EndTime": batch_end.isoformat() + "Z", + "ReadValueIds": variables, + **(additional_params or {}) + } + + content = await self._make_request(endpoint, body, max_retries, retry_delay) + return self._process_content(content) + + tasks = [ + process_batch(variables, time_batch) + for variables in variable_batches + for time_batch in range(max_time_batches) + ] - def process_content(content): - df = pd.json_normalize(content["DataValues"]) - - for i, j in content["NodeId"].items(): - df[f"HistoryReadResults.NodeId.{i}"] = j - - return df - - df_result = pd.concat((process_content(r) for r in content["HistoryReadResults"])) - df_result.reset_index(inplace=True, drop=True) + results = await asyncio.gather(*tasks) + combined_df = pd.concat(results, ignore_index=True) + return combined_df - return df_result - def get_historical_raw_values(self, - start_time: datetime, - end_time: datetime, - variable_list: List[Variables], - limit_start_index: Union[int, None] = None, - limit_num_records: Union[int, None] = None + async def get_raw_historical_values_asyn( + self, + start_time: datetime, + end_time: datetime, + variable_list: List[str], + limit_start_index: Union[int, None] = None, + limit_num_records: Union[int, None] = None, + **kwargs ) -> pd.DataFrame: - """ - Get historical raw values from the OPC UA server. + """Request raw historical values from the OPC UA server""" - Args: - start_time (datetime): The start time of the requested data. - end_time (datetime): The end time of the requested data. - variable_list (list): A list of variables to request. - limit_start_index (int, optional): The start index for limiting the number of records. Defaults to None. - limit_num_records (int, optional): The number of records to limit to. Defaults to None. - - Returns: - pd.DataFrame: A DataFrame containing the historical raw values. - """ - vars = self._get_variable_list_as_list(variable_list) - extended_variables = [{"NodeId": var} for var in vars] - additional_params = {} if limit_start_index is not None and limit_num_records is not None: additional_params["Limit"] = {"StartIndex": limit_start_index, "NumRecords": limit_num_records} - - body = self._prepare_body(start_time, end_time, extended_variables, additional_params) - df_result = self._fetch_data("values/historical", body) + combined_df = await self.get_historical_values( + start_time, + end_time, + variable_list, + "values/historical", + lambda vars: [{"NodeId": var} for var in vars], + additional_params, + **kwargs + ) columns = { "Value.Type": "ValueType", "Value.Body": "Value", "SourceTimestamp": "Timestamp", "HistoryReadResults.NodeId.IdType": "IdType", "HistoryReadResults.NodeId.Id": "Id", - "HistoryReadResults.NodeId.Namespace": "Namespace", + "HistoryReadResults.NodeId.Namespace": "Namespace" } - return self._process_df(df_result, columns) - - def get_historical_aggregated_values(self, - start_time: datetime, - end_time: datetime, - pro_interval: int, - agg_name: str, - variable_list: List[Variables] - ) -> pd.DataFrame: - """ - Request historical aggregated values from the OPC UA server. - - Args: - start_time (datetime): Start time of requested data. - end_time (datetime): End time of requested data. - pro_interval (int): Interval time of processing in milliseconds. - agg_name (str): Name of aggregation. - variable_list (List[Variables]): A list of variables you want, containing keys "Id", "Namespace" and "IdType". + return self._process_df(combined_df, columns) + + def get_raw_historical_values(self, *args, **kwargs): + result = self.helper.run_coroutine(self.get_raw_historical_values_asyn(*args, **kwargs)) + return result + - Returns: - pd.DataFrame: DataFrame with the historical aggregated values. Columns in the DataFrame are "StatusCode", - "StatusSymbol", "ValueType", "Value", "Timestamp", "IdType", "Id", "Namespace". - """ - vars = self._get_variable_list_as_list(variable_list) - extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in vars] + async def get_historical_aggregated_values_asyn( + self, + start_time: datetime, + end_time: datetime, + pro_interval: int, + agg_name: str, + variable_list: List[str], + **kwargs + ) -> pd.DataFrame: + """Request historical aggregated values from the OPC UA server""" additional_params = { - "ProcessingInterval": pro_interval, + "ProcessingInterval": pro_interval, "AggregateName": agg_name } - - body = self._prepare_body(start_time, end_time, extended_variables, additional_params) - df_result = self._fetch_data("values/historicalaggregated", body) - + + combined_df = await self.get_historical_values( + start_time, + end_time, + variable_list, + "values/historicalaggregated", + lambda vars: [{"NodeId": var, "AggregateName": agg_name} for var in vars], + additional_params, + **kwargs + ) columns = { "Value.Type": "ValueType", "Value.Body": "Value", @@ -463,157 +525,14 @@ def get_historical_aggregated_values(self, "SourceTimestamp": "Timestamp", "HistoryReadResults.NodeId.IdType": "IdType", "HistoryReadResults.NodeId.Id": "Id", - "HistoryReadResults.NodeId.Namespace": "Namespace", + "HistoryReadResults.NodeId.Namespace": "Namespace" } - return self._process_df(df_result, columns) - + return self._process_df(combined_df, columns) + + def get_historical_aggregated_values(self, *args, **kwargs): + result = self.helper.run_coroutine(self.get_historical_aggregated_values_asyn(*args, **kwargs)) + return result - async def get_historical_aggregated_values_async( - self, - start_time: datetime, - end_time: datetime, - pro_interval: int, - agg_name: str, - variable_list: List[Variables], - batch_size: int = 1000 - ) -> pd.DataFrame: - """Request historical aggregated values from the OPC UA server with batching""" - - # Configure the logging - logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') - - time_batches = self.generate_time_batches(start_time, end_time, pro_interval, batch_size) - variable_batches = self.generate_variable_batches(variable_list, batch_size) - - # Creating tasks for each API request and gathering the results - tasks = [] - - for time_batch_start, time_batch_end in time_batches: - for variable_sublist in variable_batches: - task = self.make_async_api_request(time_batch_start, time_batch_end, pro_interval, agg_name, variable_sublist) - tasks.append(asyncio.create_task(task)) - - # Execute all tasks concurrently and gather their results - responses = await asyncio.gather(*tasks) - - # Processing the API responses - result_list = [] - for idx, batch_response in enumerate(responses): - logging.info(f"Processing API response {idx+1}/{len(responses)}...") - batch_result = self.process_api_response(batch_response) - result_list.append(batch_result) - - logging.info("Concatenating results...") - result_df = pd.concat(result_list, ignore_index=True) - - return result_df - - - async def make_async_api_request(self, start_time: datetime, end_time: datetime, pro_interval: int, agg_name: str, variable_list: List[Variables]) -> dict: - """Make API request for the given time range and variable list""" - - # Creating a new variable list to remove pydantic models - vars = self._get_variable_list_as_list(variable_list) - extended_variables = [ - { - "NodeId": var, - "AggregateName": agg_name, - } - for var in vars - - ] - additional_params = { - "ProcessingInterval": pro_interval, - "AggregateName": agg_name - } - body = self._prepare_body( - start_time, - end_time, - extended_variables, - additional_params - ) - try: - # Make API request using aiohttp session - async with aiohttp.ClientSession() as session: - async with session.post( - f"{self.rest_url}values/historicalaggregated", - data=json.dumps(body, default=str), - headers=self.headers, - timeout=aiohttp.ClientTimeout(total=None) - ) as response: - response.raise_for_status() - content = await response.json() - except aiohttp.ClientResponseError as e: - if self.auth_client is not None: - self.check_auth_client(await e.json()) - else: - raise RuntimeError(f'Error message {e}') - - self._check_content(content) - return content - - - def generate_time_batches(self, start_time: datetime, end_time: datetime, pro_interval: int, batch_size: int) -> List[tuple]: - """Generate time batches based on start time, end time, processing interval, and batch size""" - - total_time_range = end_time - start_time - pro_interval_seconds = (pro_interval / 1000) - total_data_points = (total_time_range.total_seconds() // pro_interval_seconds) + 1 - - total_batches = math.ceil(total_data_points / batch_size) - actual_batch_size = math.ceil(total_data_points / total_batches) - - time_batches = [ - (start_time + timedelta(seconds=(i * actual_batch_size * pro_interval_seconds)), - start_time + timedelta(seconds=((i + 1) * actual_batch_size * pro_interval_seconds)) - timedelta(seconds=pro_interval_seconds)) - for i in range(total_batches) - ] - - return time_batches - - - def generate_variable_batches(self, variable_list: List[Variables], batch_size: int) -> List[List[Variables]]: - """Generate variable batches based on the variable list and batch size""" - - variable_batches = [ - variable_list[i:i + batch_size] for i in range(0, len(variable_list), batch_size) - ] - - return variable_batches - - - def process_api_response(self, content: dict) -> pd.DataFrame: - """Process the API response and return the result dataframe""" - - df_result_list = [] - for data in content["HistoryReadResults"]: - df = pd.json_normalize(data["DataValues"]) - - for i, j in data["NodeId"].items(): - df[f"HistoryReadResults.NodeId.{i}"] = j - - df_result_list.append(df) - - if df_result_list: - df_result = pd.concat(df_result_list) - df_result.reset_index(inplace=True, drop=True) - - columns = { - "Value.Type": "ValueType", - "Value.Body": "Value", - "StatusCode.Symbol": "StatusSymbol", - "StatusCode.Code": "StatusCode", - "SourceTimestamp": "Timestamp", - "HistoryReadResults.NodeId.IdType": "IdType", - "HistoryReadResults.NodeId.Id": "Id", - "HistoryReadResults.NodeId.Namespace": "Namespace", - } - - return self._process_df(df_result, columns) - else: - return pd.DataFrame() - - def write_values(self, variable_list: List[WriteVariables]) -> List: """Request to write realtime values to the OPC UA server diff --git a/src/pyprediktormapclient/shared.py b/src/pyprediktormapclient/shared.py index 4c11cff..09511f9 100644 --- a/src/pyprediktormapclient/shared.py +++ b/src/pyprediktormapclient/shared.py @@ -1,7 +1,8 @@ +import asyncio +import aiohttp import requests -from pydantic import AnyUrl -from typing import Literal -from pydantic import ValidationError +from pydantic import AnyUrl, ValidationError +from typing import Literal, Dict, Any class Config: arbitrary_types_allowed = True diff --git a/tests/test_auth_client.py b/tests/test_auth_client.py index 994a603..6d4ff90 100644 --- a/tests/test_auth_client.py +++ b/tests/test_auth_client.py @@ -413,7 +413,7 @@ def test_get_self_service_login_token_unsuccessful(self, mock_get): def test_get_self_service_token_expired(self): auth_client = AUTH_CLIENT(rest_url=URL, username=username, password=password) auth_client.token = Token(session_token=auth_session_id, expires_at=auth_expires_at_2hrs_ago) - auth_client.token.expires_at = datetime.datetime.utcnow() - datetime.timedelta(hours=2) + auth_client.token.expires_at = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) token_expired = auth_client.check_if_token_has_expired() assert token_expired == True diff --git a/tests/test_opc_ua.py b/tests/test_opc_ua.py index 19f56ea..cf8a991 100644 --- a/tests/test_opc_ua.py +++ b/tests/test_opc_ua.py @@ -1,9 +1,8 @@ import unittest from unittest import mock -from unittest.mock import patch import pytest -import pydantic import datetime +import aiohttp import pandas.api.types as ptypes from pydantic import ValidationError, AnyUrl, BaseModel from typing import List @@ -235,6 +234,59 @@ ] } +successful_raw_historical_result = { + "Success": True, + "ErrorMessage": "", + "ErrorCode": 0, + "ServerNamespaces": ["string"], + "HistoryReadResults": [ + { + "NodeId": { + "IdType": 2, + "Id": "SOMEID", + "Namespace": 1, + }, + "StatusCode": {"Code": 0, "Symbol": "Good"}, + "DataValues": [ + { + "Value": {"Type": 11, "Body": 34.28500000000003}, + "SourceTimestamp": "2022-09-13T13:39:51Z", + }, + { + "Value": {"Type": 11, "Body": 35.12345678901234}, + "SourceTimestamp": "2022-09-13T13:40:51Z", + }, + { + "Value": {"Type": 11, "Body": 33.98765432109876}, + "SourceTimestamp": "2022-09-13T13:41:51Z", + }, + ], + }, + { + "NodeId": { + "IdType": 2, + "Id": "SOMEID2", + "Namespace": 1, + }, + "StatusCode": {"Code": 0, "Symbol": "Good"}, + "DataValues": [ + { + "Value": {"Type": 11, "Body": 6.441666666666666}, + "SourceTimestamp": "2022-09-13T13:39:51Z", + }, + { + "Value": {"Type": 11, "Body": 6.523456789012345}, + "SourceTimestamp": "2022-09-13T13:40:51Z", + }, + { + "Value": {"Type": 11, "Body": 6.345678901234567}, + "SourceTimestamp": "2022-09-13T13:41:51Z", + }, + ], + } + ] +} + successful_write_live_response = { "Success": True, "ErrorMessage": "string", @@ -346,7 +398,6 @@ def successful_mocked_requests(*args, **kwargs): response.json_data = json_data return response - def empty_values_mocked_requests(*args, **kwargs): if args[0] == f"{URL}values/get": return MockResponse(empty_live_response, 200) @@ -495,50 +546,10 @@ def empty_mocked_requests(*args, **kwargs): return MockResponse(None, 404) -def successful_mocked_historical_requests(*args, **kwargs): - if args[0] == f"{URL}values/historicalaggregated": - return MockResponse(successful_historical_result, 200) - - return MockResponse(None, 404) - -def no_dict_mocked_historical_requests(*args, **kwargs): - if args[0] == f"{URL}values/historicalaggregated": - return MockResponse([], 200) - - return MockResponse(None, 404) - -def unsuccessful_mocked_historical_requests(*args, **kwargs): - if args[0] == f"{URL}values/historicalaggregated": - unsuc = deepcopy(successful_historical_result) - unsuc.pop("HistoryReadResults") - return MockResponse([unsuc], 200) - - return MockResponse(None, 404) - -def no_historical_result_mocked_historical_requests(*args, **kwargs): - if args[0] == f"{URL}values/historicalaggregated": - unsuc = deepcopy(successful_historical_result) - unsuc["Success"] = False - return MockResponse([unsuc], 200) - - return MockResponse(None, 404) - -def make_historical_request(): - tsdata = OPC_UA(rest_url=URL, opcua_url=OPC_URL) - return tsdata.get_historical_aggregated_values( - start_time=(datetime.datetime.now() - datetime.timedelta(30)), - end_time=(datetime.datetime.now() - datetime.timedelta(29)), - pro_interval=3600000, - agg_name="Average", - variable_list=list_of_ids, - ) - - class AnyUrlModel(BaseModel): url: AnyUrl -# Our test case class -class OPCUATestCase(unittest.TestCase): +class TestOPCUA(unittest.TestCase): def test_malformed_rest_url(self): with pytest.raises(ValidationError): AnyUrlModel(rest_url="not_an_url", opcua_url=OPC_URL) @@ -680,29 +691,6 @@ def test_get_live_values_no_status_code(self, mock_get): result = tsdata.get_values(list_of_ids) assert result[0]["StatusCode"] == None - @mock.patch("requests.post", side_effect=successful_mocked_historical_requests) - def test_historical_values_success(self, mock_get): - result = make_historical_request() - cols_to_check = ["Value"] - assert all(ptypes.is_numeric_dtype(result[col]) for col in cols_to_check) - assert result['Value'].tolist() == [34.28500000000003, 6.441666666666666, 34.28500000000003, 6.441666666666666] - assert result['ValueType'].tolist() == ["Double", "Double", "Double", "Double"] - - @mock.patch("requests.post", side_effect=no_dict_mocked_historical_requests) - def test_historical_values_no_dict(self, mock_get): - with pytest.raises(RuntimeError): - make_historical_request() - - @mock.patch("requests.post", side_effect=unsuccessful_mocked_historical_requests) - def test_historical_values_unsuccess(self, mock_get): - with pytest.raises(RuntimeError): - make_historical_request() - - @mock.patch("requests.post", side_effect=no_historical_result_mocked_historical_requests) - def test_historical_values_no_hist(self, mock_get): - with pytest.raises(RuntimeError): - make_historical_request() - @mock.patch("requests.post", side_effect=successful_write_mocked_requests) def test_write_live_values_successful(self, mock_get): tsdata = OPC_UA(rest_url=URL, opcua_url=OPC_URL) @@ -787,5 +775,115 @@ def test_get_write_historical_values_successful_with_error_codes(self, mock_get) assert result[0]["WriteError"]["Code"] == successfull_write_historical_response_with_errors["HistoryUpdateResults"][0]["StatusCode"]["Code"] assert result[0]["WriteError"]["Symbol"] == successfull_write_historical_response_with_errors["HistoryUpdateResults"][0]["StatusCode"]["Symbol"] +class AsyncMockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status = status_code + self.headers = {'Content-Type': 'application/json'} + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + pass + + async def json(self): + return self.json_data + + async def raise_for_status(self): + if self.status >= 400: + raise aiohttp.ClientResponseError( + request_info=aiohttp.RequestInfo( + url=URL, + method="POST", + headers={}, + real_url=OPC_URL + ), + history=(), + status=self.status, + message="Mocked error", + headers=self.headers + ) + +def unsuccessful_async_mock_response(*args, **kwargs): + return AsyncMockResponse( + json_data=None, + status_code=400 + ) + +async def make_historical_request(): + tsdata = OPC_UA(rest_url=URL, opcua_url=OPC_URL) + return await tsdata.get_historical_aggregated_values_asyn( + start_time=(datetime.datetime.now() - datetime.timedelta(30)), + end_time=(datetime.datetime.now() - datetime.timedelta(29)), + pro_interval=3600000, + agg_name="Average", + variable_list=list_of_ids, + ) + +async def make_raw_historical_request(): + tsdata = OPC_UA(rest_url=URL, opcua_url=OPC_URL) + return await tsdata.get_raw_historical_values_asyn( + start_time=(datetime.datetime.now() - datetime.timedelta(30)), + end_time=(datetime.datetime.now() - datetime.timedelta(29)), + variable_list=list_of_ids, + ) + +@pytest.mark.asyncio +class TestAsyncOPCUA: + + @mock.patch("aiohttp.ClientSession.post") + async def test_historical_values_success(self, mock_post): + mock_post.return_value = AsyncMockResponse( + json_data=successful_historical_result, + status_code=200 + ) + result = await make_historical_request() + cols_to_check = ["Value"] + assert all(ptypes.is_numeric_dtype(result[col]) for col in cols_to_check) + assert result['Value'].tolist() == [34.28500000000003, 6.441666666666666, 34.28500000000003, 6.441666666666666] + assert result['ValueType'].tolist() == ["Double", "Double", "Double", "Double"] + + @mock.patch("aiohttp.ClientSession.post") + async def test_historical_values_no_dict(self, mock_post): + with pytest.raises(RuntimeError): + await make_historical_request() + + @mock.patch("aiohttp.ClientSession.post") + async def test_historical_values_unsuccess(self, mock_post): + mock_post.return_value = unsuccessful_async_mock_response() + with pytest.raises(RuntimeError): + await make_historical_request() + + @mock.patch("aiohttp.ClientSession.post") + async def test_historical_values_no_hist(self, mock_post): + with pytest.raises(RuntimeError): + await make_historical_request() + + @mock.patch("aiohttp.ClientSession.post") + async def test_raw_historical_values_success(self, mock_post): + mock_post.return_value = AsyncMockResponse( + json_data=successful_raw_historical_result, + status_code=200 + ) + result = await make_raw_historical_request() + cols_to_check = ["Value"] + assert all(ptypes.is_numeric_dtype(result[col]) for col in cols_to_check) + + @mock.patch("aiohttp.ClientSession.post") + async def test_raw_historical_values_no_dict(self, mock_post): + with pytest.raises(RuntimeError): + await make_raw_historical_request() + + @mock.patch("aiohttp.ClientSession.post") + async def test_raw_historical_values_unsuccess(self, mock_post): + with pytest.raises(RuntimeError): + await make_raw_historical_request() + + @mock.patch("aiohttp.ClientSession.post") + async def test_raw_historical_values_no_hist(self, mock_post): + with pytest.raises(RuntimeError): + await make_raw_historical_request() + if __name__ == "__main__": unittest.main() diff --git a/tox.ini b/tox.ini index 92afe88..5763dcb 100644 --- a/tox.ini +++ b/tox.ini @@ -16,9 +16,17 @@ passenv = SETUPTOOLS_* extras = testing +deps = + pytest + pytest-cov + pytest-asyncio commands = pytest -s --cov pyprediktormapclient --cov-report term-missing +[pytest] +markers = + asyncio: mark a test as an asyncio test. + # # To run `tox -e lint` you need to make sure you have a # # `.pre-commit-config.yaml` file. See https://pre-commit.com