From 843d4366de3708308be01be64d3e4c8b63faba35 Mon Sep 17 00:00:00 2001 From: cefili Date: Tue, 14 Nov 2023 13:13:14 +0100 Subject: [PATCH 1/8] Added IoT class. --- doc/best_practices.md | 3 + python-lib/tc_etl_lib/README.md | 21 ++ python-lib/tc_etl_lib/tc_etl_lib/__init__.py | 4 +- python-lib/tc_etl_lib/tc_etl_lib/cb.py | 29 +-- .../tc_etl_lib/tc_etl_lib/exceptions.py | 50 ++++ python-lib/tc_etl_lib/tc_etl_lib/iot.py | 102 ++++++++ python-lib/tc_etl_lib/tc_etl_lib/test_iot.py | 223 ++++++++++++++++++ 7 files changed, 404 insertions(+), 28 deletions(-) create mode 100644 python-lib/tc_etl_lib/tc_etl_lib/exceptions.py create mode 100644 python-lib/tc_etl_lib/tc_etl_lib/iot.py create mode 100644 python-lib/tc_etl_lib/tc_etl_lib/test_iot.py diff --git a/doc/best_practices.md b/doc/best_practices.md index c93a501..616794d 100644 --- a/doc/best_practices.md +++ b/doc/best_practices.md @@ -48,6 +48,9 @@ Las funciones que actualmente soporta la librería son: - modulo `cb`: Módulo que incluye funciones relacionadas con la comunicación con el Context Broker. - `send_batch`: Función que envía un lote de entidades al Context Broker. Recibe un listado con todos los tokens por subservicio y usa el correspondiente para realizar la llamada al Context Broker. Si no se dispone de token o ha caducado, se solicita o renueva el token según el caso y luego envía los datos. - `get_entities_page`: Función que permite la recogida de datos del Context Broker. Permite el uso de ciertos parámetros como offset, limit, orderBy y type para filtrar la recogida de datos. + - modulo `iot`: Módulo que incluye funciones relacionadas con la comunicación del IoT Agent. + - `send_json`: Función que envía un JSON al IoT Agent. Recibe el nombre del sensor, su API key correspondiente, la URL del servidor y los datos a enviar. + - `send_batch`: Función que envía una lista de diccionarios al IoT Agent. Los envía uno por uno. Recibe el nombre del sensor, su API key correspondiente, la URL del servidor, el tiempo de espera en segundos entre envío y envío y los datos a enviar. Se puede encontrar más detalles de la librería en la documentación de esta. [Ref.](../python-lib/tc_etl_lib/README.md) diff --git a/python-lib/tc_etl_lib/README.md b/python-lib/tc_etl_lib/README.md index 82b3d22..b15738c 100644 --- a/python-lib/tc_etl_lib/README.md +++ b/python-lib/tc_etl_lib/README.md @@ -343,6 +343,27 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - Reemplaza todos los espacios en blanco consecutivos por el carácter de reemplazo. - NOTA: Esta función no recorta la longitud de la cadena devuelta a 256 caracteres, porque el llamante puede querer conservar la cadena entera para por ejemplo guardarla en algún otro atributo, antes de truncarla. +- Clase `IoT`: En esta clase están las funciones relacionadas con el IoT Agent. + + - `__init__`: constructor de objetos de la clase. + - `send_json`: Función que envía un archivo en formato JSON al IoT Agent. + - :param obligatorio `sensor_name`: El nombre del sensor. + - :param obligatorio `api_key`: La API key correspondiente al sensor. + - :param obligatorio `req_url`: La URL del servicio al que se le quiere enviar los datos. + - :param obligatorio: `data`: Datos a enviar. La estructura debe tener pares de elementos clave-valor (diccionario). + - :raises ValueError: Se lanza cuando los datos a enviar son distintos a un único diccionario. + - :raises Excepction: Se lanza una excepción ConnectionError cuando no puede conectarse al servidor. Se lanza una excepción FetchError cuando se produce un error en en la solicitud HTTP. + - :return: True si el envío de datos es exitoso. + - `send_batch`: Función que envía un archivo en formato JSON al IoT Agent. + - :param obligatorio `sensor_name`: El nombre del sensor. + - :param obligatorio `api_key`: La API key correspondiente al sensor. + - :param obligatorio `req_url`: La URL del servicio al que se le quiere enviar los datos. + - :param obligatorio: `time_sleep`: Es el tiempo de espera entre cada envío de datos en segundos. + - :param obligatorio: `data`: Datos a enviar. La estructura debe tener pares de elementos clave-valor (diccionario).t + - :raises ValueError: Se lanza cuando el tipo de los datos a enviar es incorrecto. + - :raises Excepction: Se lanza una excepción ConnectionError cuando no puede conectarse al servidor. Se lanza una excepción FetchError cuando se produce un error en en la solicitud HTTP. + - :return: True si el envío de datos es exitoso. + Algunos ejemplos de uso de `normalizer`: ``` diff --git a/python-lib/tc_etl_lib/tc_etl_lib/__init__.py b/python-lib/tc_etl_lib/tc_etl_lib/__init__.py index 614d69e..695ee29 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/__init__.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/__init__.py @@ -20,6 +20,8 @@ # from .auth import authManager -from .cb import FetchError, cbManager +from .cb import cbManager +from .exceptions import FetchError +from .iot import IoT from .store import Store, orionStore, sqlFileStore from .normalizer import normalizer diff --git a/python-lib/tc_etl_lib/tc_etl_lib/cb.py b/python-lib/tc_etl_lib/tc_etl_lib/cb.py index 39f9946..29cc07c 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/cb.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/cb.py @@ -34,7 +34,7 @@ import time import json -from . import authManager +from . import authManager, exceptions # control urllib3 post and get verify in false import urllib3, urllib3.exceptions @@ -42,31 +42,6 @@ logger = logging.getLogger(__name__) -class FetchError(Exception): - """ - FetchError encapsulates all parameters of an HTTP request and the erroneous response - """ - - response: requests.Response - method: str - url: str - params: Optional[Any] = None - headers: Optional[Any] = None - body: Optional[Any] = None - - def __init__(self, response: requests.Response, method: str, url: str, params: Optional[Any] = None, headers: Optional[Any] = None, body: Optional[Any] = None): - """Constructor for FetchError class""" - self.response = response - self.method = method - self.url = url - self.params = params - self.headers = headers - self.body = body - - def __str__(self) -> str: - return f"Failed to {self.method} {self.url} (headers: {self.headers}, params: {self.params}, body: {self.body}): [{self.response.status_code}] {self.response.text}" - - class cbManager: """ContextBroker Manager @@ -260,7 +235,7 @@ def get_entities_page(self, *, service: Optional[str] = None, subservice: Option respjson = resp.json() logger.error(f'{respjson["name"]}: {respjson["message"]}') if resp.status_code < 200 or resp.status_code > 204: - raise FetchError(response=resp, method="GET", url=req_url, params=params, headers=headers) + raise exceptions.FetchError(response=resp, method="GET", url=req_url, params=params, headers=headers) return resp.json() diff --git a/python-lib/tc_etl_lib/tc_etl_lib/exceptions.py b/python-lib/tc_etl_lib/tc_etl_lib/exceptions.py new file mode 100644 index 0000000..7ec05a3 --- /dev/null +++ b/python-lib/tc_etl_lib/tc_etl_lib/exceptions.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2022 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U. +# +# This file is part of tc_etl_lib +# +# tc_etl_lib is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# tc_etl_lib is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. + +""" +Exceptions handling for Python. +""" + +import requests +from typing import Any, Optional + +class FetchError(Exception): + """ + FetchError encapsulates all parameters of an HTTP request and the erroneous response. + """ + + response: requests.Response + method: str + url: str + params: Optional[Any] = None + headers: Optional[Any] = None + body: Optional[Any] = None + + def __init__(self, response: requests.Response, method: str, url: str, params: Optional[Any] = None, headers: Optional[Any] = None, body: Optional[Any] = None): + """Constructor for FetchError class.""" + self.response = response + self.method = method + self.url = url + self.params = params + self.headers = headers + self.body = body + + def __str__(self) -> str: + return f"Failed to {self.method} {self.url} (headers: {self.headers}, params: {self.params}, body: {self.body}): [{self.response.status_code}] {self.response.text}" \ No newline at end of file diff --git a/python-lib/tc_etl_lib/tc_etl_lib/iot.py b/python-lib/tc_etl_lib/tc_etl_lib/iot.py new file mode 100644 index 0000000..342fee4 --- /dev/null +++ b/python-lib/tc_etl_lib/tc_etl_lib/iot.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2022 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U. +# +# This file is part of tc_etl_lib +# +# tc_etl_lib is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# tc_etl_lib is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. + +''' +IoT routines for Python: + - iot.send_json + - iot.send_batch +''' + +from . import exceptions +import requests +import tc_etl_lib as tc +import time +from typing import Any, Iterable + + +class IoT: + """IoT is a class that allows us to communicate with the IoT Agent.""" + + def __init__(self): + pass + + def send_json(self, + sensor_name: str, + api_key: str, + req_url: str, + data: Any) -> None: + params = { + 'i': sensor_name, + 'k': api_key + } + headers = { + "Content-Type": "application/json" + } + + try: + # Verify if data is a single dictionary. + if isinstance(data, dict): + resp = requests.post(url=req_url, json=data, + params=params, headers=headers) + if resp.status_code == 200: + return True + else: + raise exceptions.FetchError( + response=resp, + method="POST", + url=req_url, + params=params, + headers=headers) + else: + raise ValueError( + "The parameter 'data' should be a single dictionary {}.") + except requests.exceptions.ConnectionError as e: + raise e + + def send_batch(self, + sensor_name: str, + api_key: str, + req_url: str, + time_sleep: float, + data: Iterable[Any]) -> None: + params = { + 'i': sensor_name, + 'k': api_key + } + headers = { + "Content-Type": "application/json" + } + + try: + for i in range(0, len(data)): + resp = requests.post( + url=req_url, json=data[i], params=params, headers=headers) + + if resp.status_code == 200: + time.sleep(time_sleep) + return True + else: + raise exceptions.FetchError(response=resp, + method="POST", + url=req_url, + params=params, + headers=headers) + except requests.exceptions.ConnectionError as e: + raise e \ No newline at end of file diff --git a/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py b/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py new file mode 100644 index 0000000..e1c7033 --- /dev/null +++ b/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2023 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U. +# +# This file is part of tc_etl_lib +# +# tc_etl_lib is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# tc_etl_lib is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. + +''' +IoT tests +''' + +from . import IoT, exceptions +import pytest +import requests +import unittest +from unittest.mock import patch, Mock + + +class TestIoT(unittest.TestCase): + def test_send_json_success(self): + """A success message should be displayed when + HTTP request is executed successfully.""" + iot = IoT() + with patch('requests.post') as mock_post: + fake_response = Mock() + # Simulates a successful code status. + fake_response.status_code = 200 + mock_post.return_value = fake_response + resp = iot.send_json("fake_sensor", "fake_api_key", + "http://fakeurl.com", {"key": "value"}) + assert resp == True + + + def test_send_json_connection_error(self): + """Should raise an exception if there is a server connection error.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_post.side_effect = requests.exceptions.ConnectionError() + with pytest.raises(requests.exceptions.ConnectionError): + iot.send_json( + "fake_sensor_name", + "fake_api_key", + "http://fakeurl.com", + {"key": "value"}) + + + def test_send_json_invalid_data_type(self): + """Should raise an exception if the data type is icorrect.""" + iot = IoT() + with pytest.raises(ValueError) as exc_info: + iot.send_json("fake_sensor_name", + "fake_api_key", + "http://fakeurl.com", + ["data"]) + exception_message = str(exc_info.value) + assert "The parameter 'data' should be a single dictionary {}." in str( + exception_message) + + + def test_send_json_set_not_unique(self): + """Should raise an exception if the data is an array of dictionaries.""" + iot = IoT() + with pytest.raises(ValueError) as exc_info: + iot.send_json("fake_sensor_name", "fake_api_key", + "http://fakeurl.com", + [ + {"key_1": "value_1"}, + {"key_2": "value_2"}]) + exception_message = str(exc_info.value) + assert "The parameter 'data' should be a single dictionary {}." in str( + exception_message) + + + def test_send_json_unauthorized(self): + """Should raise an exception if the request is unauthorized.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_post.return_value.status_code = 401 + with pytest.raises(exceptions.FetchError) as exc_info: + iot.send_json("fake_sensor_name", + "fake_api_key", + "http://fakeurl.com", + {"key": "value"}) + exception_raised = str(exc_info.value) + assert "401" in exception_raised + assert "Failed to POST http://fakeurl.com" in exception_raised + + + def test_send_json_not_found(self): + """Should raise an exception if the request is not found.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_post.return_value.status_code = 404 + with pytest.raises(exceptions.FetchError) as exc_info: + iot.send_json("fake_sensor_name", "fake_api_key", + "http://fakeurl.com", {"key": "value"}) + + exception_raised = str(exc_info.value) + assert "404" in exception_raised + assert "Failed to POST http://fakeurl.com" in exception_raised + + + def test_send_json_server_error(self): + """Should raise an exception if there is a server error.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_post.return_value.status_code = 500 + with pytest.raises(exceptions.FetchError) as exc_info: + iot.send_json("fake_sensor_name", + "fake_api_key", + "http://fakeurl.com", + {"key": "value"}) + + exception_raised = str(exc_info.value) + assert "500" in exception_raised + assert "Failed to POST http://fakeurl.com" in exception_raised + + + def test_send_batch_success(self): + """The status code should be 200 if a request is success.""" + iot = IoT() + with patch('requests.post') as mock_post: + fake_response = Mock() + # Simulates a successful status code. + fake_response.status_code = 200 + mock_post.return_value = fake_response + resp = iot.send_batch("fake_sensor", + "fake_api_key", + "http://fakeurl.com", + 0.25, + [ + {"key_1": "value_1"}, + {"key_2", "value_2"} + ]) + + + assert resp == True + + + def test_send_batch_connection_error(self): + """Should raise an exception if there is a connection error.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_post.side_effect = requests.exceptions.ConnectionError() + with pytest.raises(requests.exceptions.ConnectionError): + iot.send_batch( + "fake_sensor_name", + "fake_api_key", + "http://fakeurl.com", + 0.25, + [ + {"key_1": "value_1"}, + {"key_2": "value_2"} + ]) + + + + def test_send_batch_unauthorized(self): + """Should raise an exception if the request is unauthorized.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_post.return_value.status_code = 401 + with pytest.raises(exceptions.FetchError) as exc_info: + iot.send_batch("fake_sensor_name", + "fake_api_key", + "http://fakeurl.com", + 0.25, + [ + {"key_1": "value_1"}, + {"key_2": "value_2"}]) + exception_raised = str(exc_info.value) + assert "401" in exception_raised + assert "Failed to POST http://fakeurl.com" in exception_raised + + + def test_send_batch_not_found(self): + """Should raise an exception if the request is not found.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_post.return_value.status_code = 404 + with pytest.raises(exceptions.FetchError) as exc_info: + iot.send_batch("fake_sensor_name", + "fake_api_key", + "http://fakeurl.com", + 0.25, + [ + {"key_1": "value_1"}, + {"key_2": "value_2"}]) + + exception_raised = str(exc_info.value) + assert "404" in exception_raised + assert "Failed to POST http://fakeurl.com" in exception_raised + + + def test_send_batch_server_error(self): + """Should raise an exception if there is a server error.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_post.return_value.status_code = 500 + with pytest.raises(exceptions.FetchError) as exc_info: + iot.send_batch("fake_sensor_name", + "fake_api_key", + "http://fakeurl.com", + 0.25, + [ + {"key_1": "value_1"}, + {"key_2": "value_2"}]) + + exception_raised = str(exc_info.value) + assert "500" in exception_raised + assert "Failed to POST http://fakeurl.com" in exception_raised \ No newline at end of file From 3fa42c5f74cd15f5757a9e70383c50ab83e9d828 Mon Sep 17 00:00:00 2001 From: cefili Date: Tue, 21 Nov 2023 09:31:29 +0100 Subject: [PATCH 2/8] =?UTF-8?q?send=5Fbatch=20puede=20recibir=20un=20DataF?= =?UTF-8?q?rame.=20Se=20a=C3=B1adieron=20pruebas=20unitarias.=20Mejora=20e?= =?UTF-8?q?n=20el=20manejo=20de=20excepciones.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/best_practices.md | 6 +- .../{requeriments.txt => requirements.txt} | 3 +- python-lib/tc_etl_lib/README.md | 15 +- python-lib/tc_etl_lib/tc_etl_lib/iot.py | 91 +++++----- python-lib/tc_etl_lib/tc_etl_lib/test_iot.py | 157 ++++++------------ 5 files changed, 112 insertions(+), 160 deletions(-) rename etls/hello-world/{requeriments.txt => requirements.txt} (59%) diff --git a/doc/best_practices.md b/doc/best_practices.md index 616794d..bfd9b36 100644 --- a/doc/best_practices.md +++ b/doc/best_practices.md @@ -48,9 +48,9 @@ Las funciones que actualmente soporta la librería son: - modulo `cb`: Módulo que incluye funciones relacionadas con la comunicación con el Context Broker. - `send_batch`: Función que envía un lote de entidades al Context Broker. Recibe un listado con todos los tokens por subservicio y usa el correspondiente para realizar la llamada al Context Broker. Si no se dispone de token o ha caducado, se solicita o renueva el token según el caso y luego envía los datos. - `get_entities_page`: Función que permite la recogida de datos del Context Broker. Permite el uso de ciertos parámetros como offset, limit, orderBy y type para filtrar la recogida de datos. - - modulo `iot`: Módulo que incluye funciones relacionadas con la comunicación del IoT Agent. - - `send_json`: Función que envía un JSON al IoT Agent. Recibe el nombre del sensor, su API key correspondiente, la URL del servidor y los datos a enviar. - - `send_batch`: Función que envía una lista de diccionarios al IoT Agent. Los envía uno por uno. Recibe el nombre del sensor, su API key correspondiente, la URL del servidor, el tiempo de espera en segundos entre envío y envío y los datos a enviar. + - modulo `iot`: Módulo que incluye funciones relacionadas con la comunicación del agente IoT. + - `send_json`: Función que envía un JSON al agente IoT. Recibe el nombre del sensor, su API key correspondiente, la URL del servidor y los datos a enviar. + - `send_batch`: Función que envía un archivo en formato JSON al agente IoT. Puede recibir una lista de diccionarios o un DataFrame. En el caso de DataFrames, convierte cada fila en un diccionario y los envía uno por uno. Recibe el nombre del sensor, su API key correspondiente, la URL del servidor, el tiempo de espera en segundos entre envío y envío y los datos a enviar. Se puede encontrar más detalles de la librería en la documentación de esta. [Ref.](../python-lib/tc_etl_lib/README.md) diff --git a/etls/hello-world/requeriments.txt b/etls/hello-world/requirements.txt similarity index 59% rename from etls/hello-world/requeriments.txt rename to etls/hello-world/requirements.txt index 85fa9b0..64493d6 100644 --- a/etls/hello-world/requeriments.txt +++ b/etls/hello-world/requirements.txt @@ -1,2 +1,3 @@ # Include here the dependencies your ETL needs -#tc_etl_lib==0.9.0 \ No newline at end of file +#tc_etl_lib==0.9.0 +pandas==2.0.3 \ No newline at end of file diff --git a/python-lib/tc_etl_lib/README.md b/python-lib/tc_etl_lib/README.md index b15738c..351f16a 100644 --- a/python-lib/tc_etl_lib/README.md +++ b/python-lib/tc_etl_lib/README.md @@ -343,26 +343,27 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - Reemplaza todos los espacios en blanco consecutivos por el carácter de reemplazo. - NOTA: Esta función no recorta la longitud de la cadena devuelta a 256 caracteres, porque el llamante puede querer conservar la cadena entera para por ejemplo guardarla en algún otro atributo, antes de truncarla. -- Clase `IoT`: En esta clase están las funciones relacionadas con el IoT Agent. +- Clase `IoT`: En esta clase están las funciones relacionadas con el agente IoT. - `__init__`: constructor de objetos de la clase. - - `send_json`: Función que envía un archivo en formato JSON al IoT Agent. + - `send_json`: Función que envía un archivo en formato JSON al agente IoT. - :param obligatorio `sensor_name`: El nombre del sensor. - :param obligatorio `api_key`: La API key correspondiente al sensor. - :param obligatorio `req_url`: La URL del servicio al que se le quiere enviar los datos. - :param obligatorio: `data`: Datos a enviar. La estructura debe tener pares de elementos clave-valor (diccionario). - :raises ValueError: Se lanza cuando los datos a enviar son distintos a un único diccionario. - :raises Excepction: Se lanza una excepción ConnectionError cuando no puede conectarse al servidor. Se lanza una excepción FetchError cuando se produce un error en en la solicitud HTTP. + - :raises ValueError: Se lanza cuando el tipo de los datos a enviar es incorrecto. + - :raises Excepction: Se lanza una excepción ConnectionError cuando no puede conectarse al servidor. Se lanza una excepción FetchError cuando se produce un error en en la solicitud HTTP. - :return: True si el envío de datos es exitoso. - - `send_batch`: Función que envía un archivo en formato JSON al IoT Agent. + - `send_batch`: Función que envía un archivo en formato JSON al agente IoT. - :param obligatorio `sensor_name`: El nombre del sensor. - :param obligatorio `api_key`: La API key correspondiente al sensor. - :param obligatorio `req_url`: La URL del servicio al que se le quiere enviar los datos. - :param obligatorio: `time_sleep`: Es el tiempo de espera entre cada envío de datos en segundos. - - :param obligatorio: `data`: Datos a enviar. La estructura debe tener pares de elementos clave-valor (diccionario).t - - :raises ValueError: Se lanza cuando el tipo de los datos a enviar es incorrecto. - - :raises Excepction: Se lanza una excepción ConnectionError cuando no puede conectarse al servidor. Se lanza una excepción FetchError cuando se produce un error en en la solicitud HTTP. - - :return: True si el envío de datos es exitoso. + - :param obligatorio: `data`: Datos a enviar. Puede ser una lista de diccionarios o un DataFrame. + - :raises SendBatchError: Se levanta cuando se produce una excepción dentro de `send_json`. Atrapa la excepción original y se guarda y se imprime el índice donde se produjo el error. + Algunos ejemplos de uso de `normalizer`: diff --git a/python-lib/tc_etl_lib/tc_etl_lib/iot.py b/python-lib/tc_etl_lib/tc_etl_lib/iot.py index 342fee4..bd183ba 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/iot.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/iot.py @@ -25,11 +25,18 @@ ''' from . import exceptions +import pandas as pd import requests import tc_etl_lib as tc import time from typing import Any, Iterable +class SendBatchError(Exception): + "SendBatchError is a class that can handle exceptions." + def __init__(self, message, original_exception=None, index=None): + super().__init__(message) + self.original_exception = original_exception + self.index = index class IoT: """IoT is a class that allows us to communicate with the IoT Agent.""" @@ -38,10 +45,14 @@ def __init__(self): pass def send_json(self, - sensor_name: str, - api_key: str, - req_url: str, - data: Any) -> None: + sensor_name: str, + api_key: str, + req_url: str, + data: Any) -> None: + + if not isinstance(data, dict): + raise ValueError("The 'data' parameter should be a dictionary with key-value pairs.") + params = { 'i': sensor_name, 'k': api_key @@ -51,52 +62,38 @@ def send_json(self, } try: - # Verify if data is a single dictionary. - if isinstance(data, dict): - resp = requests.post(url=req_url, json=data, - params=params, headers=headers) - if resp.status_code == 200: - return True - else: - raise exceptions.FetchError( - response=resp, - method="POST", - url=req_url, - params=params, - headers=headers) + resp = requests.post(url=req_url, json=data, params=params, headers=headers) + if resp.status_code == 200: + return True else: - raise ValueError( - "The parameter 'data' should be a single dictionary {}.") + raise exceptions.FetchError( + response=resp, + method="POST", + url=req_url, + params=params, + headers=headers) except requests.exceptions.ConnectionError as e: raise e def send_batch(self, - sensor_name: str, - api_key: str, - req_url: str, - time_sleep: float, - data: Iterable[Any]) -> None: - params = { - 'i': sensor_name, - 'k': api_key - } - headers = { - "Content-Type": "application/json" - } - - try: - for i in range(0, len(data)): - resp = requests.post( - url=req_url, json=data[i], params=params, headers=headers) + sensor_name: str, + api_key: str, + req_url: str, + time_sleep: float, + data: Iterable[pd.DataFrame | dict]) -> None: - if resp.status_code == 200: - time.sleep(time_sleep) - return True - else: - raise exceptions.FetchError(response=resp, - method="POST", - url=req_url, - params=params, - headers=headers) - except requests.exceptions.ConnectionError as e: - raise e \ No newline at end of file + if isinstance(data, pd.DataFrame): + # Convierte cada fila del DataFrame a un diccionario. + for i, row in data.iterrows(): + try: + self.send_json(sensor_name, api_key, req_url, row.to_dict()) + time.sleep(time_sleep) + except Exception as e: + raise SendBatchError(f"send_batch error. Row that caused the error: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e + else: + for i, dictionary in enumerate(data): + try: + self.send_json(sensor_name, api_key, req_url, dictionary) + time.sleep(time_sleep) + except Exception as e: + raise SendBatchError(f"send_batch error. Index where the error occurred: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e \ No newline at end of file diff --git a/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py b/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py index e1c7033..44b6382 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py @@ -21,11 +21,13 @@ IoT tests ''' -from . import IoT, exceptions +from . import exceptions +from tc_etl_lib.iot import SendBatchError, IoT +import pandas as pd import pytest import requests import unittest -from unittest.mock import patch, Mock +from unittest.mock import patch, Mock, MagicMock class TestIoT(unittest.TestCase): @@ -42,9 +44,8 @@ def test_send_json_success(self): "http://fakeurl.com", {"key": "value"}) assert resp == True - def test_send_json_connection_error(self): - """Should raise an exception if there is a server connection error.""" + """Should raise an exception when there is a server connection error.""" iot = IoT() with patch('requests.post') as mock_post: mock_post.side_effect = requests.exceptions.ConnectionError() @@ -55,9 +56,8 @@ def test_send_json_connection_error(self): "http://fakeurl.com", {"key": "value"}) - def test_send_json_invalid_data_type(self): - """Should raise an exception if the data type is icorrect.""" + """Should raise an exception when the data type is incorrect.""" iot = IoT() with pytest.raises(ValueError) as exc_info: iot.send_json("fake_sensor_name", @@ -65,10 +65,9 @@ def test_send_json_invalid_data_type(self): "http://fakeurl.com", ["data"]) exception_message = str(exc_info.value) - assert "The parameter 'data' should be a single dictionary {}." in str( + assert "The 'data' parameter should be a dictionary with key-value pairs." in str( exception_message) - def test_send_json_set_not_unique(self): """Should raise an exception if the data is an array of dictionaries.""" iot = IoT() @@ -79,12 +78,11 @@ def test_send_json_set_not_unique(self): {"key_1": "value_1"}, {"key_2": "value_2"}]) exception_message = str(exc_info.value) - assert "The parameter 'data' should be a single dictionary {}." in str( + assert "The 'data' parameter should be a dictionary with key-value pairs." in str( exception_message) - def test_send_json_unauthorized(self): - """Should raise an exception if the request is unauthorized.""" + """Should raise an exception when the request is unauthorized.""" iot = IoT() with patch('requests.post') as mock_post: mock_post.return_value.status_code = 401 @@ -97,9 +95,8 @@ def test_send_json_unauthorized(self): assert "401" in exception_raised assert "Failed to POST http://fakeurl.com" in exception_raised - def test_send_json_not_found(self): - """Should raise an exception if the request is not found.""" + """Should raise an exception when the request is not found.""" iot = IoT() with patch('requests.post') as mock_post: mock_post.return_value.status_code = 404 @@ -111,7 +108,6 @@ def test_send_json_not_found(self): assert "404" in exception_raised assert "Failed to POST http://fakeurl.com" in exception_raised - def test_send_json_server_error(self): """Should raise an exception if there is a server error.""" iot = IoT() @@ -127,97 +123,54 @@ def test_send_json_server_error(self): assert "500" in exception_raised assert "Failed to POST http://fakeurl.com" in exception_raised + def test_send_batch_dict_success(self): + """send_json should be called twice when we send an array with 2 dictionaries.""" + with patch('tc_etl_lib.iot.IoT.send_json') as mock_send_json: + iot = IoT() + iot.send_batch('fake_sensor_name', 'fake:api_key', 'http://fakeurl.com', 0.25, [{'key1': 'value1'}, {'key2': 'value2'}]) + self.assertEqual(mock_send_json.call_count, 2) - def test_send_batch_success(self): - """The status code should be 200 if a request is success.""" + def test_send_batch_dict_value_error(self): + """Should raise ValueError and then raise SendBatchError with the index that failed.""" iot = IoT() with patch('requests.post') as mock_post: - fake_response = Mock() - # Simulates a successful status code. - fake_response.status_code = 200 - mock_post.return_value = fake_response - resp = iot.send_batch("fake_sensor", - "fake_api_key", - "http://fakeurl.com", - 0.25, - [ - {"key_1": "value_1"}, - {"key_2", "value_2"} - ]) - - - assert resp == True + mock_post.return_value.status_code = 200 + with self.assertRaises(SendBatchError) as context: + iot.send_batch('fake_sensor_name', 'fake:api_key', 'http://fakeurl.com', 0.25, [{"key_1": "value_1"}, 2]) + expected_message = "send_batch error. Index where the error occurred: 1\nError detail: The 'data' parameter should be a dictionary with key-value pairs." + self.assertEqual(str(context.exception), expected_message) + # Verify that the POST request was called for the first dictionary. + mock_post.assert_called_once() def test_send_batch_connection_error(self): - """Should raise an exception if there is a connection error.""" - iot = IoT() - with patch('requests.post') as mock_post: - mock_post.side_effect = requests.exceptions.ConnectionError() - with pytest.raises(requests.exceptions.ConnectionError): - iot.send_batch( - "fake_sensor_name", - "fake_api_key", - "http://fakeurl.com", - 0.25, - [ - {"key_1": "value_1"}, - {"key_2": "value_2"} - ]) - - - - def test_send_batch_unauthorized(self): - """Should raise an exception if the request is unauthorized.""" - iot = IoT() - with patch('requests.post') as mock_post: - mock_post.return_value.status_code = 401 - with pytest.raises(exceptions.FetchError) as exc_info: - iot.send_batch("fake_sensor_name", - "fake_api_key", - "http://fakeurl.com", - 0.25, - [ - {"key_1": "value_1"}, - {"key_2": "value_2"}]) - exception_raised = str(exc_info.value) - assert "401" in exception_raised - assert "Failed to POST http://fakeurl.com" in exception_raised - - - def test_send_batch_not_found(self): - """Should raise an exception if the request is not found.""" - iot = IoT() - with patch('requests.post') as mock_post: - mock_post.return_value.status_code = 404 - with pytest.raises(exceptions.FetchError) as exc_info: - iot.send_batch("fake_sensor_name", - "fake_api_key", - "http://fakeurl.com", - 0.25, - [ - {"key_1": "value_1"}, - {"key_2": "value_2"}]) - - exception_raised = str(exc_info.value) - assert "404" in exception_raised - assert "Failed to POST http://fakeurl.com" in exception_raised - - - def test_send_batch_server_error(self): - """Should raise an exception if there is a server error.""" - iot = IoT() - with patch('requests.post') as mock_post: - mock_post.return_value.status_code = 500 - with pytest.raises(exceptions.FetchError) as exc_info: - iot.send_batch("fake_sensor_name", - "fake_api_key", - "http://fakeurl.com", - 0.25, - [ - {"key_1": "value_1"}, - {"key_2": "value_2"}]) - - exception_raised = str(exc_info.value) - assert "500" in exception_raised - assert "Failed to POST http://fakeurl.com" in exception_raised \ No newline at end of file + """Should raise a ConnectionError exception and then raise SedBatchError with the index that failed.""" + iot = IoT() + with patch('requests.post') as mock_post: + mock_success = MagicMock() + mock_success.status_code = 200 + + mock_connection_error = MagicMock(side_effect=ConnectionError("Simulated connection error")) + # Simulates a connection error when tries to send the second dictionary. + mock_post.side_effect = [mock_success, mock_connection_error] + + with self.assertRaises(SendBatchError) as exc_info: + iot.send_batch('fake_sensor_name', 'fake:api_key', 'http://fakeurl.com', 0.25, [{"key_1": "value_1"}, {"key_2": "value_2"}]) + + exception_raised = str(exc_info.exception) + + assert "send_batch error. Index where the error occurred: 1\nError detail:" in exception_raised + assert "Failed to POST" in exception_raised + # Verify that it tried to do a POST request for each dictionary. + assert mock_post.call_count == 2 + + def test_send_batch_data_frame_success(self): + """send_json should be called 3 times when we send a DataFrame with 3 rows.""" + with patch('tc_etl_lib.iot.IoT.send_json') as mock_send_json: + iot = IoT() + data = pd.DataFrame({ + 'column1': [1, 2, 3], + 'column2': ['a', 'b', 'c'] + }) + iot.send_batch('fake_sensor_name', 'fake:api_key', 'http://fakeurl.com', 0.25, data) + self.assertEqual(mock_send_json.call_count, 3) \ No newline at end of file From c8a86551c43b7112ac41625a589fe1cb66243cec Mon Sep 17 00:00:00 2001 From: cefili Date: Tue, 21 Nov 2023 10:55:41 +0100 Subject: [PATCH 3/8] send_batch returns True when success. --- python-lib/tc_etl_lib/tc_etl_lib/iot.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python-lib/tc_etl_lib/tc_etl_lib/iot.py b/python-lib/tc_etl_lib/tc_etl_lib/iot.py index bd183ba..f530b29 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/iot.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/iot.py @@ -29,7 +29,7 @@ import requests import tc_etl_lib as tc import time -from typing import Any, Iterable +from typing import Iterable, Union class SendBatchError(Exception): "SendBatchError is a class that can handle exceptions." @@ -48,7 +48,7 @@ def send_json(self, sensor_name: str, api_key: str, req_url: str, - data: Any) -> None: + data: dict) -> Union[None, bool]: if not isinstance(data, dict): raise ValueError("The 'data' parameter should be a dictionary with key-value pairs.") @@ -80,7 +80,7 @@ def send_batch(self, api_key: str, req_url: str, time_sleep: float, - data: Iterable[pd.DataFrame | dict]) -> None: + data: Iterable[pd.DataFrame | dict]) -> Union[None, bool]: if isinstance(data, pd.DataFrame): # Convierte cada fila del DataFrame a un diccionario. @@ -96,4 +96,5 @@ def send_batch(self, self.send_json(sensor_name, api_key, req_url, dictionary) time.sleep(time_sleep) except Exception as e: - raise SendBatchError(f"send_batch error. Index where the error occurred: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e \ No newline at end of file + raise SendBatchError(f"send_batch error. Index where the error occurred: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e + return True \ No newline at end of file From c1fdf40c63c30965008a9e067447c512f04d9e3b Mon Sep 17 00:00:00 2001 From: cefili Date: Wed, 22 Nov 2023 08:27:22 +0100 Subject: [PATCH 4/8] Correcting data parameter type. --- python-lib/tc_etl_lib/tc_etl_lib/iot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-lib/tc_etl_lib/tc_etl_lib/iot.py b/python-lib/tc_etl_lib/tc_etl_lib/iot.py index f530b29..909f947 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/iot.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/iot.py @@ -80,7 +80,7 @@ def send_batch(self, api_key: str, req_url: str, time_sleep: float, - data: Iterable[pd.DataFrame | dict]) -> Union[None, bool]: + data: Iterable) -> Union[None, bool]: if isinstance(data, pd.DataFrame): # Convierte cada fila del DataFrame a un diccionario. From 1b09c523d8f63013d9258c81d01a40ddbe79235f Mon Sep 17 00:00:00 2001 From: cefili Date: Thu, 23 Nov 2023 12:09:46 +0100 Subject: [PATCH 5/8] Renombramientos. Correcta importacion de pandas. Ejemplos de uso. Links a tipos de excepciones. --- doc/best_practices.md | 6 +- etls/hello-world/requirements.txt | 3 +- python-lib/tc_etl_lib/README.md | 32 ++--- python-lib/tc_etl_lib/setup.py | 3 +- python-lib/tc_etl_lib/tc_etl_lib/__init__.py | 2 +- .../tc_etl_lib/tc_etl_lib/{iot.py => iota.py} | 62 +++++----- .../tc_etl_lib/{test_iot.py => test_iota.py} | 111 ++++++++---------- 7 files changed, 105 insertions(+), 114 deletions(-) rename python-lib/tc_etl_lib/tc_etl_lib/{iot.py => iota.py} (58%) rename python-lib/tc_etl_lib/tc_etl_lib/{test_iot.py => test_iota.py} (58%) diff --git a/doc/best_practices.md b/doc/best_practices.md index bfd9b36..3f254c2 100644 --- a/doc/best_practices.md +++ b/doc/best_practices.md @@ -48,9 +48,9 @@ Las funciones que actualmente soporta la librería son: - modulo `cb`: Módulo que incluye funciones relacionadas con la comunicación con el Context Broker. - `send_batch`: Función que envía un lote de entidades al Context Broker. Recibe un listado con todos los tokens por subservicio y usa el correspondiente para realizar la llamada al Context Broker. Si no se dispone de token o ha caducado, se solicita o renueva el token según el caso y luego envía los datos. - `get_entities_page`: Función que permite la recogida de datos del Context Broker. Permite el uso de ciertos parámetros como offset, limit, orderBy y type para filtrar la recogida de datos. - - modulo `iot`: Módulo que incluye funciones relacionadas con la comunicación del agente IoT. - - `send_json`: Función que envía un JSON al agente IoT. Recibe el nombre del sensor, su API key correspondiente, la URL del servidor y los datos a enviar. - - `send_batch`: Función que envía un archivo en formato JSON al agente IoT. Puede recibir una lista de diccionarios o un DataFrame. En el caso de DataFrames, convierte cada fila en un diccionario y los envía uno por uno. Recibe el nombre del sensor, su API key correspondiente, la URL del servidor, el tiempo de espera en segundos entre envío y envío y los datos a enviar. + - modulo `iota`: Módulo que incluye funciones relacionadas con la comunicación con el agente IoT. + - `send_http`: Función que envía un JSON al agente IoT por una petición HTTP. + - `send_batch_http`: Función que envía un un conjunto de datos en formato JSON al agente IoT por una petición HTTP. Puede recibir una lista de diccionarios o un DataFrame. En el caso de DataFrames, convierte cada fila en un diccionario y los envía uno por uno cada cierto tiempo definido en `sleep_send_batch`. Se puede encontrar más detalles de la librería en la documentación de esta. [Ref.](../python-lib/tc_etl_lib/README.md) diff --git a/etls/hello-world/requirements.txt b/etls/hello-world/requirements.txt index 64493d6..85fa9b0 100644 --- a/etls/hello-world/requirements.txt +++ b/etls/hello-world/requirements.txt @@ -1,3 +1,2 @@ # Include here the dependencies your ETL needs -#tc_etl_lib==0.9.0 -pandas==2.0.3 \ No newline at end of file +#tc_etl_lib==0.9.0 \ No newline at end of file diff --git a/python-lib/tc_etl_lib/README.md b/python-lib/tc_etl_lib/README.md index 351f16a..bda466c 100644 --- a/python-lib/tc_etl_lib/README.md +++ b/python-lib/tc_etl_lib/README.md @@ -112,6 +112,10 @@ entities = cbm.get_entities_page(subservice='/energia', auth=auth, type='myType' # have a look to the retrieved entities print (json.dumps(entities)) + +#create an iota manager and use it +iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://:/iot/json', sensor_id='', api_key='') +iotam.send_http(data={"": "", "": ""}) ``` Ejemplo de uso de Recogida de todos los datos de tipo SupplyPoint con y sin paginación: @@ -225,6 +229,10 @@ with tc.orionStore(cb=cbm, auth=auth, subservice='/energia') as store: # O un store sqlFile with tc.sqlFileStore(path="inserts.sql", subservice="/energia", namespace="energy") as store: store(entities) + +# Envío de datos en ráfaga al Agente IoT. +iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://:/iot/json', sensor_id='', api_key='', sleep_send_batch='') +iotam.send_batch_http(data=[{"": "", "": ""}, {"": "", "": ""}]) ``` ## Funciones disponibles en la librería @@ -343,26 +351,22 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - Reemplaza todos los espacios en blanco consecutivos por el carácter de reemplazo. - NOTA: Esta función no recorta la longitud de la cadena devuelta a 256 caracteres, porque el llamante puede querer conservar la cadena entera para por ejemplo guardarla en algún otro atributo, antes de truncarla. -- Clase `IoT`: En esta clase están las funciones relacionadas con el agente IoT. +- Clase `iotaManager`: En esta clase están las funciones relacionadas con el agente IoT. - `__init__`: constructor de objetos de la clase. - - `send_json`: Función que envía un archivo en formato JSON al agente IoT. - - :param obligatorio `sensor_name`: El nombre del sensor. + - :param obligatorio `sensor_id`: El ID del sensor. - :param obligatorio `api_key`: La API key correspondiente al sensor. - - :param obligatorio `req_url`: La URL del servicio al que se le quiere enviar los datos. + - :param obligatorio `endpoint`: La URL del servicio al que se le quiere enviar los datos. + - :param opcional `sleep_send_batch`: Es el tiempo de espera entre cada envío de datos en segundos. + - `send_http`: Función que envía un archivo en formato JSON al agente IoT por petición HTTP. - :param obligatorio: `data`: Datos a enviar. La estructura debe tener pares de elementos clave-valor (diccionario). - - :raises ValueError: Se lanza cuando los datos a enviar son distintos a un único diccionario. - - :raises Excepction: Se lanza una excepción ConnectionError cuando no puede conectarse al servidor. Se lanza una excepción FetchError cuando se produce un error en en la solicitud HTTP. - - :raises ValueError: Se lanza cuando el tipo de los datos a enviar es incorrecto. - - :raises Excepction: Se lanza una excepción ConnectionError cuando no puede conectarse al servidor. Se lanza una excepción FetchError cuando se produce un error en en la solicitud HTTP. + - :raises [TypeError](https://docs.python.org/3/library/exceptions.html#TypeError): Se lanza cuando el tipo de dato es incorrecto. + - :raises [ConnectionError](https://docs.python.org/3/library/exceptions.html#ConnectionError): Se lanza cuando no puede conectarse al servidor. + - :raises FetchError: se lanza cuando se produce un error en en la solicitud HTTP. - :return: True si el envío de datos es exitoso. - - `send_batch`: Función que envía un archivo en formato JSON al agente IoT. - - :param obligatorio `sensor_name`: El nombre del sensor. - - :param obligatorio `api_key`: La API key correspondiente al sensor. - - :param obligatorio `req_url`: La URL del servicio al que se le quiere enviar los datos. - - :param obligatorio: `time_sleep`: Es el tiempo de espera entre cada envío de datos en segundos. + - `send_batch_http`: Función que envía un conjunto de datos en formato JSON al agente IoT por petición HTTP. - :param obligatorio: `data`: Datos a enviar. Puede ser una lista de diccionarios o un DataFrame. - - :raises SendBatchError: Se levanta cuando se produce una excepción dentro de `send_json`. Atrapa la excepción original y se guarda y se imprime el índice donde se produjo el error. + - :raises SendBatchError: Se levanta cuando se produce una excepción dentro de `send_http`. Atrapa la excepción original y se guarda y se imprime el índice donde se produjo el error. Algunos ejemplos de uso de `normalizer`: diff --git a/python-lib/tc_etl_lib/setup.py b/python-lib/tc_etl_lib/setup.py index 68d0202..8d50a9b 100644 --- a/python-lib/tc_etl_lib/setup.py +++ b/python-lib/tc_etl_lib/setup.py @@ -38,7 +38,8 @@ INSTALL_REQUIRES = [ 'requests==2.21.0', 'urllib3==1.24.1', - 'psycopg2-binary>=2.9.5' + 'psycopg2-binary>=2.9.5', + 'pandas==2.0.3' ] setup( diff --git a/python-lib/tc_etl_lib/tc_etl_lib/__init__.py b/python-lib/tc_etl_lib/tc_etl_lib/__init__.py index 695ee29..6ac4e26 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/__init__.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/__init__.py @@ -22,6 +22,6 @@ from .auth import authManager from .cb import cbManager from .exceptions import FetchError -from .iot import IoT +from .iota import iotaManager from .store import Store, orionStore, sqlFileStore from .normalizer import normalizer diff --git a/python-lib/tc_etl_lib/tc_etl_lib/iot.py b/python-lib/tc_etl_lib/tc_etl_lib/iota.py similarity index 58% rename from python-lib/tc_etl_lib/tc_etl_lib/iot.py rename to python-lib/tc_etl_lib/tc_etl_lib/iota.py index 909f947..17f3bd5 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/iot.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/iota.py @@ -19,9 +19,9 @@ # along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. ''' -IoT routines for Python: - - iot.send_json - - iot.send_batch +IoT Agent routines for Python: + - iotaManager.send_http + - iotaManager.send_batch_http ''' from . import exceptions @@ -29,7 +29,7 @@ import requests import tc_etl_lib as tc import time -from typing import Iterable, Union +from typing import Iterable, Optional, Union class SendBatchError(Exception): "SendBatchError is a class that can handle exceptions." @@ -38,63 +38,67 @@ def __init__(self, message, original_exception=None, index=None): self.original_exception = original_exception self.index = index -class IoT: - """IoT is a class that allows us to communicate with the IoT Agent.""" +class iotaManager: + """IoT Agent Manager. - def __init__(self): - pass + endpoint: define service endpoint cb (example: https://:) + timeout: timeout in seconds (default: 10) + """ - def send_json(self, - sensor_name: str, - api_key: str, - req_url: str, + endpoint: str + sensor_id: str + api_key: str + sleep_send_batch: float + + def __init__(self, endpoint: str, sensor_id: str, api_key: str, sleep_send_batch: float = 0): + self.endpoint = endpoint + self.sensor_id = sensor_id + self.api_key = api_key + self.sleep_send_batch = sleep_send_batch + + def send_http(self, data: dict) -> Union[None, bool]: if not isinstance(data, dict): - raise ValueError("The 'data' parameter should be a dictionary with key-value pairs.") + raise TypeError("The 'data' parameter should be a dictionary with key-value pairs.") params = { - 'i': sensor_name, - 'k': api_key + 'i': self.sensor_id, + 'k': self.api_key } headers = { "Content-Type": "application/json" } try: - resp = requests.post(url=req_url, json=data, params=params, headers=headers) + resp = requests.post(url=self.endpoint, json=data, params=params, headers=headers) if resp.status_code == 200: return True else: raise exceptions.FetchError( response=resp, method="POST", - url=req_url, + url=self.endpoint, params=params, headers=headers) except requests.exceptions.ConnectionError as e: raise e - def send_batch(self, - sensor_name: str, - api_key: str, - req_url: str, - time_sleep: float, - data: Iterable) -> Union[None, bool]: + def send_batch_http(self, data: Iterable) -> Union[None, bool]: if isinstance(data, pd.DataFrame): # Convierte cada fila del DataFrame a un diccionario. for i, row in data.iterrows(): try: - self.send_json(sensor_name, api_key, req_url, row.to_dict()) - time.sleep(time_sleep) + self.send_http(row.to_dict()) + time.sleep(self.sleep_send_batch) except Exception as e: - raise SendBatchError(f"send_batch error. Row that caused the error: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e + raise SendBatchError(f"send_batch_http error. Row that caused the error: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e else: for i, dictionary in enumerate(data): try: - self.send_json(sensor_name, api_key, req_url, dictionary) - time.sleep(time_sleep) + self.send_http(dictionary) + time.sleep(self.sleep_send_batch) except Exception as e: - raise SendBatchError(f"send_batch error. Index where the error occurred: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e + raise SendBatchError(f"send_batch_http error. Index where the error occurred: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e return True \ No newline at end of file diff --git a/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py b/python-lib/tc_etl_lib/tc_etl_lib/test_iota.py similarity index 58% rename from python-lib/tc_etl_lib/tc_etl_lib/test_iot.py rename to python-lib/tc_etl_lib/tc_etl_lib/test_iota.py index 44b6382..d24cbda 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/test_iot.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/test_iota.py @@ -18,11 +18,11 @@ # along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. ''' -IoT tests +IoT Agent tests ''' from . import exceptions -from tc_etl_lib.iot import SendBatchError, IoT +from tc_etl_lib.iota import SendBatchError, iotaManager import pandas as pd import pytest import requests @@ -30,122 +30,105 @@ from unittest.mock import patch, Mock, MagicMock -class TestIoT(unittest.TestCase): - def test_send_json_success(self): +class TestIotaManager(unittest.TestCase): + def test_send_http_success(self): """A success message should be displayed when HTTP request is executed successfully.""" - iot = IoT() + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key') with patch('requests.post') as mock_post: fake_response = Mock() # Simulates a successful code status. fake_response.status_code = 200 mock_post.return_value = fake_response - resp = iot.send_json("fake_sensor", "fake_api_key", - "http://fakeurl.com", {"key": "value"}) + resp = iot.send_http(data={"key": "value"}) assert resp == True - def test_send_json_connection_error(self): + def test_send_http_connection_error(self): """Should raise an exception when there is a server connection error.""" - iot = IoT() + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key') with patch('requests.post') as mock_post: mock_post.side_effect = requests.exceptions.ConnectionError() with pytest.raises(requests.exceptions.ConnectionError): - iot.send_json( - "fake_sensor_name", - "fake_api_key", - "http://fakeurl.com", - {"key": "value"}) + iot.send_http(data={"key": "value"}) - def test_send_json_invalid_data_type(self): + def test_send_http_invalid_data_type(self): """Should raise an exception when the data type is incorrect.""" - iot = IoT() - with pytest.raises(ValueError) as exc_info: - iot.send_json("fake_sensor_name", - "fake_api_key", - "http://fakeurl.com", - ["data"]) + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key') + with pytest.raises(TypeError) as exc_info: + iot.send_http(data=["data"]) exception_message = str(exc_info.value) assert "The 'data' parameter should be a dictionary with key-value pairs." in str( exception_message) - def test_send_json_set_not_unique(self): + def test_send_http_set_not_unique(self): """Should raise an exception if the data is an array of dictionaries.""" - iot = IoT() - with pytest.raises(ValueError) as exc_info: - iot.send_json("fake_sensor_name", "fake_api_key", - "http://fakeurl.com", - [ + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key') + with pytest.raises(TypeError) as exc_info: + iot.send_http(data=[ {"key_1": "value_1"}, {"key_2": "value_2"}]) exception_message = str(exc_info.value) assert "The 'data' parameter should be a dictionary with key-value pairs." in str( exception_message) - def test_send_json_unauthorized(self): + def test_send_http_unauthorized(self): """Should raise an exception when the request is unauthorized.""" - iot = IoT() + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key') with patch('requests.post') as mock_post: mock_post.return_value.status_code = 401 with pytest.raises(exceptions.FetchError) as exc_info: - iot.send_json("fake_sensor_name", - "fake_api_key", - "http://fakeurl.com", - {"key": "value"}) + iot.send_http(data={"key": "value"}) exception_raised = str(exc_info.value) assert "401" in exception_raised assert "Failed to POST http://fakeurl.com" in exception_raised - def test_send_json_not_found(self): + def test_send_http_not_found(self): """Should raise an exception when the request is not found.""" - iot = IoT() + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key') with patch('requests.post') as mock_post: mock_post.return_value.status_code = 404 with pytest.raises(exceptions.FetchError) as exc_info: - iot.send_json("fake_sensor_name", "fake_api_key", - "http://fakeurl.com", {"key": "value"}) + iot.send_http(data={"key": "value"}) exception_raised = str(exc_info.value) assert "404" in exception_raised assert "Failed to POST http://fakeurl.com" in exception_raised - def test_send_json_server_error(self): + def test_send_http_server_error(self): """Should raise an exception if there is a server error.""" - iot = IoT() + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key') with patch('requests.post') as mock_post: mock_post.return_value.status_code = 500 with pytest.raises(exceptions.FetchError) as exc_info: - iot.send_json("fake_sensor_name", - "fake_api_key", - "http://fakeurl.com", - {"key": "value"}) + iot.send_http(data={"key": "value"}) exception_raised = str(exc_info.value) assert "500" in exception_raised assert "Failed to POST http://fakeurl.com" in exception_raised - def test_send_batch_dict_success(self): - """send_json should be called twice when we send an array with 2 dictionaries.""" - with patch('tc_etl_lib.iot.IoT.send_json') as mock_send_json: - iot = IoT() - iot.send_batch('fake_sensor_name', 'fake:api_key', 'http://fakeurl.com', 0.25, [{'key1': 'value1'}, {'key2': 'value2'}]) - self.assertEqual(mock_send_json.call_count, 2) + def test_send_batch_http_dict_success(self): + """send_http should be called twice when we send an array with 2 dictionaries.""" + with patch('tc_etl_lib.iota.iotaManager.send_http') as mock_send_http: + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key', sleep_send_batch=0.25) + iot.send_batch_http(data=[{'key1': 'value1'}, {'key2': 'value2'}]) + self.assertEqual(mock_send_http.call_count, 2) - def test_send_batch_dict_value_error(self): - """Should raise ValueError and then raise SendBatchError with the index that failed.""" - iot = IoT() + def test_send_batch_http_dict_value_error(self): + """Should raise TypeError and then raise SendBatchError with the index that failed.""" + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key', sleep_send_batch=0.25) with patch('requests.post') as mock_post: mock_post.return_value.status_code = 200 with self.assertRaises(SendBatchError) as context: - iot.send_batch('fake_sensor_name', 'fake:api_key', 'http://fakeurl.com', 0.25, [{"key_1": "value_1"}, 2]) + iot.send_batch_http(data=[{"key_1": "value_1"}, 2]) - expected_message = "send_batch error. Index where the error occurred: 1\nError detail: The 'data' parameter should be a dictionary with key-value pairs." + expected_message = "send_batch_http error. Index where the error occurred: 1\nError detail: The 'data' parameter should be a dictionary with key-value pairs." self.assertEqual(str(context.exception), expected_message) # Verify that the POST request was called for the first dictionary. mock_post.assert_called_once() - def test_send_batch_connection_error(self): + def test_send_batch_http_connection_error(self): """Should raise a ConnectionError exception and then raise SedBatchError with the index that failed.""" - iot = IoT() + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key', sleep_send_batch=0.25) with patch('requests.post') as mock_post: mock_success = MagicMock() mock_success.status_code = 200 @@ -155,22 +138,22 @@ def test_send_batch_connection_error(self): mock_post.side_effect = [mock_success, mock_connection_error] with self.assertRaises(SendBatchError) as exc_info: - iot.send_batch('fake_sensor_name', 'fake:api_key', 'http://fakeurl.com', 0.25, [{"key_1": "value_1"}, {"key_2": "value_2"}]) + iot.send_batch_http(data=[{"key_1": "value_1"}, {"key_2": "value_2"}]) exception_raised = str(exc_info.exception) - assert "send_batch error. Index where the error occurred: 1\nError detail:" in exception_raised + assert "send_batch_http error. Index where the error occurred: 1\nError detail:" in exception_raised assert "Failed to POST" in exception_raised # Verify that it tried to do a POST request for each dictionary. assert mock_post.call_count == 2 - def test_send_batch_data_frame_success(self): - """send_json should be called 3 times when we send a DataFrame with 3 rows.""" - with patch('tc_etl_lib.iot.IoT.send_json') as mock_send_json: - iot = IoT() + def test_send_batch_http_data_frame_success(self): + """send_http should be called 3 times when we send a DataFrame with 3 rows.""" + with patch('tc_etl_lib.iota.iotaManager.send_http') as mock_send_http: + iot = iotaManager(endpoint='http://fakeurl.com', sensor_id='fake_sensor_id', api_key='fake_api_key', sleep_send_batch=0.25) data = pd.DataFrame({ 'column1': [1, 2, 3], 'column2': ['a', 'b', 'c'] }) - iot.send_batch('fake_sensor_name', 'fake:api_key', 'http://fakeurl.com', 0.25, data) - self.assertEqual(mock_send_json.call_count, 3) \ No newline at end of file + iot.send_batch_http(data=data) + self.assertEqual(mock_send_http.call_count, 3) \ No newline at end of file From 7540295a2ba3a7bdde2e9dcc14c962cd2563f3fc Mon Sep 17 00:00:00 2001 From: cefili Date: Thu, 23 Nov 2023 12:15:14 +0100 Subject: [PATCH 6/8] Corrigiendo comentarios. --- python-lib/tc_etl_lib/tc_etl_lib/iota.py | 8 +++++--- python-lib/tc_etl_lib/tc_etl_lib/test_iota.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python-lib/tc_etl_lib/tc_etl_lib/iota.py b/python-lib/tc_etl_lib/tc_etl_lib/iota.py index 17f3bd5..6b86e50 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/iota.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/iota.py @@ -41,8 +41,10 @@ def __init__(self, message, original_exception=None, index=None): class iotaManager: """IoT Agent Manager. - endpoint: define service endpoint cb (example: https://:) - timeout: timeout in seconds (default: 10) + endpoint: define service endpoint iota (example: https://:). + sensor_id: sensor ID. + api_key: API key of the corresponding sensor. + sleep_send_batch: time sleep in seconds (default: 0). """ endpoint: str @@ -87,7 +89,7 @@ def send_http(self, def send_batch_http(self, data: Iterable) -> Union[None, bool]: if isinstance(data, pd.DataFrame): - # Convierte cada fila del DataFrame a un diccionario. + # Convert each row of the DataFrame to a dictionary. for i, row in data.iterrows(): try: self.send_http(row.to_dict()) diff --git a/python-lib/tc_etl_lib/tc_etl_lib/test_iota.py b/python-lib/tc_etl_lib/tc_etl_lib/test_iota.py index d24cbda..eeeed28 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/test_iota.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/test_iota.py @@ -18,7 +18,7 @@ # along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. ''' -IoT Agent tests +IoT Agent tests. ''' from . import exceptions From 743394f336c6c82a4c1bc3c56b55574e4e069ec6 Mon Sep 17 00:00:00 2001 From: "DOMGAVSA\\cefiro" Date: Mon, 27 Nov 2023 14:05:26 +0100 Subject: [PATCH 7/8] Ejemplo de uso aparte. Se agrega el log del cambio. --- python-lib/tc_etl_lib/README.md | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/python-lib/tc_etl_lib/README.md b/python-lib/tc_etl_lib/README.md index bda466c..145e66b 100644 --- a/python-lib/tc_etl_lib/README.md +++ b/python-lib/tc_etl_lib/README.md @@ -112,10 +112,6 @@ entities = cbm.get_entities_page(subservice='/energia', auth=auth, type='myType' # have a look to the retrieved entities print (json.dumps(entities)) - -#create an iota manager and use it -iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://:/iot/json', sensor_id='', api_key='') -iotam.send_http(data={"": "", "": ""}) ``` Ejemplo de uso de Recogida de todos los datos de tipo SupplyPoint con y sin paginación: @@ -229,6 +225,14 @@ with tc.orionStore(cb=cbm, auth=auth, subservice='/energia') as store: # O un store sqlFile with tc.sqlFileStore(path="inserts.sql", subservice="/energia", namespace="energy") as store: store(entities) +``` + +Ejemplo de uso de la clase iotaManager + +``` +#create an iota manager and use it +iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://:/iot/json', sensor_id='', api_key='') +iotam.send_http(data={"": "", "": ""}) # Envío de datos en ráfaga al Agente IoT. iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://:/iot/json', sensor_id='', api_key='', sleep_send_batch='') @@ -357,7 +361,7 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - :param obligatorio `sensor_id`: El ID del sensor. - :param obligatorio `api_key`: La API key correspondiente al sensor. - :param obligatorio `endpoint`: La URL del servicio al que se le quiere enviar los datos. - - :param opcional `sleep_send_batch`: Es el tiempo de espera entre cada envío de datos en segundos. + - :param opcional `sleep_send_batch`: Es el tiempo de espera entre cada envío de datos en segundos (default: 0). - `send_http`: Función que envía un archivo en formato JSON al agente IoT por petición HTTP. - :param obligatorio: `data`: Datos a enviar. La estructura debe tener pares de elementos clave-valor (diccionario). - :raises [TypeError](https://docs.python.org/3/library/exceptions.html#TypeError): Se lanza cuando el tipo de dato es incorrecto. @@ -501,6 +505,10 @@ TOTAL 403 221 45% ## Changelog +0.10.0 (November 27th, 2023) + +- Add: new class `iotaManager` to deal with IoT Agent interactions, with methods `send_http` and `send_batch_http`([#70](https://github.com/telefonicasc/etl-framework/pull/70)) + 0.9.0 (May 16th, 2023) - Add: new class `normalizer` to clean up text strings to be used as NGSI entity IDs, by replacing or removing forbidden characters ([#54](https://github.com/telefonicasc/etl-framework/pull/54)) From 13046dec4a080bd7a077a8bc1803f8a6241abf3d Mon Sep 17 00:00:00 2001 From: "DOMGAVSA\\cefiro" Date: Tue, 28 Nov 2023 07:55:58 +0100 Subject: [PATCH 8/8] =?UTF-8?q?Se=20quit=C3=B3=20versi=C3=B3n=20y=20fecha.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python-lib/tc_etl_lib/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/python-lib/tc_etl_lib/README.md b/python-lib/tc_etl_lib/README.md index 145e66b..6e36854 100644 --- a/python-lib/tc_etl_lib/README.md +++ b/python-lib/tc_etl_lib/README.md @@ -505,8 +505,6 @@ TOTAL 403 221 45% ## Changelog -0.10.0 (November 27th, 2023) - - Add: new class `iotaManager` to deal with IoT Agent interactions, with methods `send_http` and `send_batch_http`([#70](https://github.com/telefonicasc/etl-framework/pull/70)) 0.9.0 (May 16th, 2023)