diff --git a/python-lib/tc_etl_lib/README.md b/python-lib/tc_etl_lib/README.md index b9b4f9d..421122a 100644 --- a/python-lib/tc_etl_lib/README.md +++ b/python-lib/tc_etl_lib/README.md @@ -259,6 +259,7 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - :param opcional `service`: Se puede indicar al servicio al que se le quiere enviar los datos. Sino se indica, usará el que haya inicializado en el objeto authManager. Sino dispone ninguno de los dos definidos, lanzará un ValueError indicando que necesita definirse el servicio. - :param opcional `subservice`: Se puede indicar al subservicio al que se le quiere enviar los datos. Sino se indica, usará el que haya inicializado en el objeto authManager. Sino dispone ninguno de los dos definidos, lanzará un ValueError indicando que necesita definirse el subservicio. - :param opcional `actionType`: El tipo de acción que se le va aplicar al batch que se envía al Context Broker. Por defecto es `append`. Referencia en [API NGSIv2 de Orion](http://telefonicaid.github.io/fiware-orion/api/v2/stable/) + - :param opcional `options`: Lista de opciones separadas que recibe el Context Broker y que permite cierto comportamiento. Se pueden ver las opciones disponibles en [API NGSIv2 de Orion](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#update-post-v2opupdate). En el caso de que la opcion `flowControl` se especifique dentro de este parámetro, un `cb_flowcontrol` (en la inicializción de cbManager) a `False` se ignora, quedanco como si `cb_flowcontrol` se hubiese establecido a `True`. - :raises [ValueError](https://docs.python.org/3/library/exceptions.html#ValueError): Se lanza cuando le falta algún argumento o inicializar alguna varibale del objeto cbManager, para poder realizar la autenticación o envío de datos. - :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando el servicio de Context Broker, responde con un error concreto. - :return: True si la operación es correcta (i.e. el CB devolió un code http 204). @@ -276,6 +277,7 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - :param opcional `geometry`: Cuando se define un filtro de datos por geolocalización, se ha de especificar un tipo de dibujo que se utiliza para resolver el filtrado. Se pueden consultar los diferentes valores de geometry en [NGSIv2 API](http://telefonicaid.github.io/fiware-orion/api/v2/stable) - :param opcional `coords`: Cuando se define un filtro de datos por geolocalización, se ha de especificar una lista de coordenadas geograficas separadas por coma. Se pueden consultar los diferentes valores de coords en [NGSIv2 API](http://telefonicaid.github.io/fiware-orion/api/v2/stable) - :param opcional `id`: Se establece un filtro por Identificador. Si no se especifica, enviará la petición al Context Broker sin el id definido, por lo tanto los datos no serán filtrados por identificador. + - :param opcional `options`: Lista de opciones que recibe el Context Broker y que permite cierto comportamiento. Se pueden ver las opciones disponibles en [API NGSIv2 de Orion](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#list-entities-get-v2entities) - :raises [ValueError](https://docs.python.org/3/library/exceptions.html#ValueError): Se lanza cuando le falta algún argumento o inicializar alguna varibale del objeto cbManager, para poder realizar la autenticación o envío de datos. - :raises FetchError: Se lanza cuando el servicio de Context Broker, responde con un error concreto. - :return: array de datos cuyos elementos son objeto que representan entidades, según el formato descrito en la sección @@ -293,6 +295,7 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - :param opcional `geometry`: Cuando se define un filtro de datos por geolocalización, se ha de especificar un tipo de dibujo que se utiliza para resolver el filtrado. Se pueden consultar los diferentes valores de geometry en [NGSIv2 API](http://telefonicaid.github.io/fiware-orion/api/v2/stable) - :param opcional `coords`: Cuando se define un filtro de datos por geolocalización, se ha de especificar una lista de coordenadas geograficas separadas por coma. Se pueden consultar los diferentes valores de coords en [NGSIv2 API](http://telefonicaid.github.io/fiware-orion/api/v2/stable) - :param opcional `id`: Se establece un filtro por Identificador. Si no se especifica, enviará la petición al Context Broker sin el id definido, por lo tanto los datos no serán filtrados por identificador. + - :param opcional `options`: Lista de opciones que recibe el Context Broker. Se pueden ver las opciones disponibles en [API NGSIv2 de Orion](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#list-entities-get-v2entities) - :raises [ValueError](https://docs.python.org/3/library/exceptions.html#ValueError): Se lanza cuando le falta algún argumento o inicializar alguna varibale del objeto cbManager, para poder realizar la autenticación o envío de datos. - :raises FetchError: Se lanza cuando el servicio de Context Broker, responde con un error concreto. - :return: array de datos cuyos elementos son objeto que representan entidades, según el formato descrito en la sección @@ -309,11 +312,17 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - :param opcional `geometry`: Cuando se define un filtro de datos por geolocalización, se ha de especificar un tipo de dibujo que se utiliza para resolver el filtrado. Se pueden consultar los diferentes valores de geometry en [NGSIv2 API](http://telefonicaid.github.io/fiware-orion/api/v2/stable) - :param opcional `coords`: Cuando se define un filtro de datos por geolocalización, se ha de especificar una lista de coordenadas geograficas separadas por coma. Se pueden consultar los diferentes valores de coords en [NGSIv2 API](http://telefonicaid.github.io/fiware-orion/api/v2/stable) - :param opcional `id`: Se establece un filtro por Identificador. Si no se especifica, enviará la petición al Context Broker sin el id definido, por lo tanto los datos no serán filtrados por identificador. + - :param opcional `options_get`: Cadena de opciones separadas por coma, que recibe el Context Broker cuando va recoger las entidades a eliminar. Se pueden ver las opciones disponibles en [API NGSIv2 de Orion](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#list-entities-get-v2entities). + - :param opcional `options_send`: Lista de opciones que recibe el Context Broker cuando va a eliminar las entidades. Se pueden ver las opciones disponibles en [API NGSIv2 de Orion](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#update-post-v2opupdate). En el caso de que la opcion `flowControl` se especifique dentro de este parámetro, un `cb_flowcontrol` (en la inicializción de cbManager) a `False` se ignora, quedanco como si `cb_flowcontrol` se hubiese establecido a `True`. - :raises [ValueError](https://docs.python.org/3/library/exceptions.html#ValueError): Se lanza cuando le falta algún argumento o inicializar alguna varibale del objeto cbManager, para poder realizar la autenticación o envío de datos. - :raises FetchError: Se lanza cuando el servicio de Context Broker, responde con un error concreto. ## Changelog +- Add: new optional parameter called `options` in get_entities, get_entities_page and send_batch ([#38](https://github.com/telefonicasc/etl-framework/issues/38)) +- Add: new optionals parameters called `options_send` and `options_get` in delete_entities ([#38](https://github.com/telefonicasc/etl-framework/issues/38)) +- Fix: cleaner logs, avoiding printing warnings in the case of unsecure CB API calls + 0.4.0 (August 31st, 2022) - Add: new optional parameter called `service` in get_entities, get_entities_page, delete_entities and send_batch diff --git a/python-lib/tc_etl_lib/tc_etl_lib/cb.py b/python-lib/tc_etl_lib/tc_etl_lib/cb.py index e051f9a..4a7f0dd 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/cb.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/cb.py @@ -22,6 +22,8 @@ ContextBroker routines for Python: - cbManager.send_batch - cbManager.get_entities_page + - cbManager.get_entities + - cbManager.delete_entities ''' import requests from requests.adapters import HTTPAdapter @@ -34,6 +36,9 @@ from . import authManager +# control urllib3 post and get verify in false +import urllib3, urllib3.exceptions +urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning) logger = logging.getLogger(__name__) @@ -95,7 +100,7 @@ def __init__(self,*, endpoint: str = None, timeout: int = 10, post_retry_connect self.block_size = block_size - def delete_entities(self, *, service: str = None, subservice: str = None, auth: authManager = None, limit: int = 100, type: str = None, q: str = None, mq: str = None, georel: str = None, geometry: str = None, coords: str = None, id: str = None): + def delete_entities(self, *, service: str = None, subservice: str = None, auth: authManager = None, limit: int = 100, type: str = None, q: str = None, mq: str = None, georel: str = None, geometry: str = None, coords: str = None, id: str = None, options_get: list = [], options_send: list = []): """Delete data from context broker :param service: Define service from which entities are deleted, defaults to None @@ -109,10 +114,12 @@ def delete_entities(self, *, service: str = None, subservice: str = None, auth: :param geometry: Allows to define the reference shape to be used when resolving the query. (point | polygon | line | box), defaults to None :param coords: Must be a string containing a semicolon-separated list of pairs of geographical coordinates in accordance with the geometry specified, defaults to None :param id: Delete entities filtering by Identity, defaults to None + :param options_get: Options used in Context Broker to find entities, defaults to None + :param options_send: Options used in Context Broker to delete entities, defaults to None :raises ValueError: is thrown when some required argument is missing :raises FetchError: is thrown when the response from the cb indicates an error """ - data = self.get_entities(service=service, subservice=subservice, auth=auth, limit = limit, type = type, q = q, mq = mq, georel = georel, geometry = geometry, coords = coords, id = id) + data = self.get_entities(service=service, subservice=subservice, auth=auth, limit = limit, type = type, q = q, mq = mq, georel = georel, geometry = geometry, coords = coords, id = id, options=options_get) entities = [] for i, item in enumerate(data): @@ -122,9 +129,9 @@ def delete_entities(self, *, service: str = None, subservice: str = None, auth: } entities.append(entity) - self.send_batch(service=service, subservice=subservice, auth=auth, entities=entities, actionType='delete') + self.send_batch(service=service, subservice=subservice, auth=auth, entities=entities, actionType='delete', options=options_send) - def get_entities(self, *, service: str = None, subservice: str = None, auth: authManager = None, limit: int = 100, type: str = None, orderBy: str = None, q: str = None, mq: str = None, georel: str = None, geometry: str = None, coords: str = None, id: str = None): + def get_entities(self, *, service: str = None, subservice: str = None, auth: authManager = None, limit: int = 100, type: str = None, orderBy: str = None, q: str = None, mq: str = None, georel: str = None, geometry: str = None, coords: str = None, id: str = None, options: list = []): """Retrieve data from context broker :param service: Define service from which entities are retrieved, defaults to None @@ -139,6 +146,7 @@ def get_entities(self, *, service: str = None, subservice: str = None, auth: aut :param geometry: Allows to define the reference shape to be used when resolving the query. (point | polygon | line | box), defaults to None :param coords: Must be a string containing a semicolon-separated list of pairs of geographical coordinates in accordance with the geometry specified, defaults to None :param id: Retrieve entities filtering by Identity, defaults to None + :param options: Options used to retrive entities, defaults to None :raises ValueError: is thrown when some required argument is missing :raises FetchError: is thrown when the response from the cb indicates an error :return: json data @@ -150,12 +158,12 @@ def get_entities(self, *, service: str = None, subservice: str = None, auth: aut data = ['go!'] while (data != []) : offset = (pg-1)*limit - data = self.get_entities_page(service=service, subservice=subservice, auth=auth, offset = offset, limit = limit, type = type, orderBy = orderBy, q = q, mq = mq, georel = georel, geometry = geometry, coords = coords, id = id) + data = self.get_entities_page(service=service, subservice=subservice, auth=auth, offset = offset, limit = limit, type = type, orderBy = orderBy, q = q, mq = mq, georel = georel, geometry = geometry, coords = coords, id = id, options = options) pg += 1 result += data return result - def get_entities_page(self, *, service:str = None, subservice: str = None, auth: authManager = None, offset: int = None, limit: int = None, type: str = None, orderBy: str = None, q: str = None, mq: str = None, georel: str = None, geometry: str = None, coords: str = None, id: str = None): + def get_entities_page(self, *, service:str = None, subservice: str = None, auth: authManager = None, offset: int = None, limit: int = None, type: str = None, orderBy: str = None, q: str = None, mq: str = None, georel: str = None, geometry: str = None, coords: str = None, id: str = None, options: list = []): """Retrieve data from context broker :param service: Define service from which entities are retrieved, defaults to None or auth.service defined value @@ -171,6 +179,7 @@ def get_entities_page(self, *, service:str = None, subservice: str = None, auth: :param geometry: Allows to define the reference shape to be used when resolving the query. (point | polygon | line | box), defaults to None :param coords: Must be a string containing a semicolon-separated list of pairs of geographical coordinates in accordance with the geometry specified, defaults to None :param id: Retrieve entities filtering by Identity, defaults to None + :param options: Options used, defaults to None :raises ValueError: is thrown when some required argument is missing :raises FetchError: is thrown when the response from the cb indicates an error :return: json data @@ -216,8 +225,15 @@ def get_entities_page(self, *, service:str = None, subservice: str = None, auth: else: raise ValueError('If use geographical queries, you must define georel, geometry and coords in params') + params = {"offset": offset, "limit": limit, "type": type, "orderBy": orderBy, "q": q, "mq": mq, "georel": georel, "geometry": geometry, "coords": coords, "id": id} - req_url = f"{self.endpoint}/v2/entities" + + req_url = "" + if (options != None and len(options) > 0): + req_url = f"{self.endpoint}/v2/entities?options={','.join(options)}" + else: + req_url = f"{self.endpoint}/v2/entities" + resp = requests.get(req_url, params=params, headers=headers, verify=False, timeout=self.timeout) if resp.status_code == 400 or resp.status_code == 401: respjson = resp.json() @@ -228,7 +244,7 @@ def get_entities_page(self, *, service:str = None, subservice: str = None, auth: return resp.json() - def send_batch(self, *, service:str = None, subservice: str = None, auth: authManager = None, entities: str, actionType: str = 'append') -> bool: + def send_batch(self, *, service:str = None, subservice: str = None, auth: authManager = None, entities: str, actionType: str = 'append', options: list = []) -> bool: """Send batch data to context broker with block control :param auth: Define authManager @@ -236,6 +252,7 @@ def send_batch(self, *, service:str = None, subservice: str = None, auth: authMa :param service: Define service to send batch data, defaults to None :param subservice: Define subservice to send batch data, defaults to None :param actionType: Batch action type, defaults is append + :param options: Options used, defaults to None :raises ValueError: is thrown when some required argument is missing :raises Exception: is thrown when the cotext broker response isn't ok operation :return: True if the operation is correct @@ -248,16 +265,16 @@ def send_batch(self, *, service:str = None, subservice: str = None, auth: authMa if accumulated_block > self.block_size: logger.debug(f'- Sending a batch {actionType} of {len(entitiesToSend)} entities') - self.__send_batch(auth=auth, service=service, subservice=subservice, entities=entitiesToSend, actionType=actionType) + self.__send_batch(auth=auth, service=service, subservice=subservice, entities=entitiesToSend, actionType=actionType, options=options) entitiesToSend = [] accumulated_block = 0 # Remaining block, if any if accumulated_block > 0: logger.debug(f'- Sending final batch {actionType} of {len(entitiesToSend)} entities') - self.__send_batch(auth=auth, service=service, subservice=subservice, entities=entitiesToSend, actionType=actionType) + self.__send_batch(auth=auth, service=service, subservice=subservice, entities=entitiesToSend, actionType=actionType, options=options) - def __send_batch(self, *, service:str = None, subservice: str = None, auth: authManager = None, entities: str, actionType: str = 'append') -> bool: + def __send_batch(self, *, service:str = None, subservice: str = None, auth: authManager = None, entities: str, actionType: str = 'append', options: list = []) -> bool: """Send batch data to context broker :param auth: Define authManager @@ -265,6 +282,7 @@ def __send_batch(self, *, service:str = None, subservice: str = None, auth: auth :param service: Define service to send batch data, defaults to None :param subservice: Define subservice to send batch data, defaults to None or auth.subservice defined value :param actionType: Batch action type, defaults is append + :param options: Options used, defaults to None :raises ValueError: is thrown when some required argument is missing :raises Exception: is thrown when the cotext broker response isn't ok operation :return: True if the operation is correct @@ -286,10 +304,10 @@ def __send_batch(self, *, service:str = None, subservice: str = None, auth: auth if (auth != None and subservice not in auth.tokens.keys()): auth.get_auth_token_subservice(subservice = subservice) - res = self.__batch_creation(auth=auth, service=service, subservice = subservice, entities=entities, actionType=actionType) + res = self.__batch_creation(auth=auth, service=service, subservice = subservice, entities=entities, actionType=actionType, options=options) if (auth != None and res.status_code == 401): auth.get_auth_token_subservice(subservice = subservice) - res = self.__batch_creation(auth=auth, service=service, subservice = subservice, entities=entities, actionType=actionType) + res = self.__batch_creation(auth=auth, service=service, subservice = subservice, entities=entities, actionType=actionType, options=options) if res.status_code != 204: raise Exception(f'Error in batch {actionType} operation ({res.status_code}): {res.json()}') @@ -301,7 +319,7 @@ def __send_batch(self, *, service:str = None, subservice: str = None, auth: auth return True - def __batch_creation(self, *, service: str = None, subservice: str = None, auth: authManager = None, entities: str, actionType: str = 'append'): + def __batch_creation(self, *, service: str = None, subservice: str = None, auth: authManager = None, entities: str, actionType: str = 'append', options: list = []): """Send batch data to Context Broker :param entities: Entities data @@ -309,6 +327,7 @@ def __batch_creation(self, *, service: str = None, subservice: str = None, auth: :param subservice: Define subservice to send batch data, defaults to None or auth.subservice defined value :param auth: Define authManager, defaults to None :param actionType: Batch action type, defaults is append + :params options: Options used, default to None :raises ValueError: is thrown when some required argument is missing :return: Http response code """ @@ -349,11 +368,20 @@ def __batch_creation(self, *, service: str = None, subservice: str = None, auth: 'actionType': f'{actionType}', 'entities': entities } - + + # if cb_flowcontrol, add flowControl flag to options if (self.cb_flowcontrol): - req_url = f'{self.endpoint}/v2/op/update?options=flowControl' - else: - req_url = f'{self.endpoint}/v2/op/update' + if (options == None): + options = ['flowControl'] + else: + # check if flowcontrol is in options. + if 'flowControl' not in options: + options.append('flowControl') + + if (options != None and len(options) > 0): + req_url = f"{self.endpoint}/v2/op/update?options={','.join(options)}" + else: + req_url = f"{self.endpoint}/v2/op/update" http = requests.Session() retry_strategy = Retry(