From 952bca1db13861a7be54210655e318584401bee4 Mon Sep 17 00:00:00 2001 From: "gintautas.poderys" Date: Mon, 6 Jan 2025 15:41:12 +0200 Subject: [PATCH 1/3] replaced model outage crosschecking and fixing implementation --- .../model_merger/model_merger.py | 13 +++- .../model_merger/temporary_fixes.py | 59 +++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/emf/loadflow_tool/model_merger/model_merger.py b/emf/loadflow_tool/model_merger/model_merger.py index 7302572..3fd2241 100644 --- a/emf/loadflow_tool/model_merger/model_merger.py +++ b/emf/loadflow_tool/model_merger/model_merger.py @@ -24,7 +24,7 @@ # TODO - move this async solution to some common module from concurrent.futures import ThreadPoolExecutor from lxml import etree -from emf.loadflow_tool.model_merger.temporary_fixes import run_post_merge_processing, run_pre_merge_processing +from emf.loadflow_tool.model_merger.temporary_fixes import run_post_merge_processing, run_pre_merge_processing, fix_model_outages logger = logging.getLogger(__name__) parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.merger) @@ -72,7 +72,10 @@ def handle(self, task_object: dict, **kwargs): "exclusion_reason": [], "replacement": False, "replaced_entity": [], - "replacement_reason": []} + "replacement_reason": [], + "outages_corrected": False, + "outages_applied": [], + "outages_unmapped": []} # Parse relevant data from Task task = task_object @@ -222,6 +225,12 @@ def handle(self, task_object: dict, **kwargs): merged_model = merge_functions.load_model(input_models) # TODO - run other LF if default fails + # Crosscheck replaced model outages with latest UAP if atleast one baltic model was replaced + replaced_tso_list = [model['tso'] for model in merge_log['replaced_entity']] + + if any(tso in ['LITGRID', 'AST', 'ELERING'] for tso in replaced_tso_list): + merged_model, merge_log = fix_model_outages(merged_model, replaced_tso_list, merge_log, scenario_datetime) + # Various fixes from igmsshvscgmssh error if remove_non_generators_from_slack_participation: network_pre_instance = merged_model["network"] diff --git a/emf/loadflow_tool/model_merger/temporary_fixes.py b/emf/loadflow_tool/model_merger/temporary_fixes.py index 6db94f3..257a397 100644 --- a/emf/loadflow_tool/model_merger/temporary_fixes.py +++ b/emf/loadflow_tool/model_merger/temporary_fixes.py @@ -1,4 +1,6 @@ import triplets + +from emf.common.integrations import elastic from emf.loadflow_tool.helper import create_opdm_objects from emf.loadflow_tool.model_merger.merge_functions import (load_opdm_data, create_sv_and_updated_ssh, fix_sv_shunts, fix_sv_tapsteps, remove_duplicate_sv_voltages, @@ -7,6 +9,7 @@ export_to_cgmes_zip, set_brell_lines_to_zero_in_models, configure_paired_boundarypoint_injections_by_nodes, set_brell_lines_to_zero_in_models_new) +from emf.loadflow_tool.model_merger.model_merger import logger def run_pre_merge_processing(input_models, merging_area): @@ -60,3 +63,59 @@ def run_post_merge_processing(input_models, solved_model, task_properties, SMALL #sv_data, ssh_data = disconnect_equipment_if_flow_sum_not_zero(cgm_sv_data=sv_data, cgm_ssh_data=ssh_data, original_data=models_as_triplets) fix implemented in pypowsybl 1.8.1 return sv_data, ssh_data + + +def fix_model_outages(merged_model, replaced_model_list: list, merge_log, scenario_datetime): + + area_map = {"LITGRID": "Lithuania", "AST": "Latvia", "ELERING": "Estonia"} + outage_areas = [area_map.get(item, item) for item in replaced_model_list] + + elk_service = elastic.Elastic() + + # Get outage eic-mrid mapping + mrid_map = elk_service.get_docs_by_query(index='config-network*', query={"match_all": {}}, size=10000) + mrid_map['mrid'] = mrid_map['mrid'].str.lstrip('_') + + # Get latest UAP parse date + body = {"size": 1, "query": {"bool": {"must": [{"match": {"Merge": "Week"}}]}}, + "sort": [{"reportParsedDate": {"order": "desc"}}], "fields": ["reportParsedDate"]} + last_uap_version = elk_service.client.search(index='opc-outages-baltics*', body=body)['hits']['hits'][0]['fields']['reportParsedDate'][0] + + # Query for latest outage UAP + uap_query = {"bool": {"must": [{"match": {"reportParsedDate": f"{last_uap_version}"}}, + {"match": {"Merge": "Week"}}]}} + uap_outages = elk_service.get_docs_by_query(index='opc-outages-baltics*', query=uap_query, size=10000) + uap_outages = uap_outages.merge(mrid_map[['eic', 'mrid']], how='left', on='eic').rename(columns={"mrid": 'grid_element_id'}) + + # Filter outages according to model scenario date and replaced area + filtered_outages = uap_outages[(uap_outages['start_date'] <= scenario_datetime) & (uap_outages['end_date'] >= scenario_datetime)] + filtered_outages = filtered_outages[filtered_outages['Area'].isin(outage_areas)] + + mapped_outages = filtered_outages[~filtered_outages['grid_element_id'].isna()] + missing_outages = filtered_outages[filtered_outages['grid_element_id'].isna()] + + if not missing_outages.empty: + logger.warning(f"Missing outage mRID(s): {missing_outages['name'].values}") + + # TODO find reliable way to enable incorrectly disconnected outages + # model_outages = pd.DataFrame(get_model_outages(merged_model['network'])) + # mapped = pd.merge(model_outages, mrid_map, left_on='grid_id', right_on='mrid', how='inner') + + logger.info("Checking outages inside merged model:") + for index, outage in mapped_outages.iterrows(): + try: + if merged_model['network'].disconnect(outage['grid_element_id']): + logger.info(f"Line {outage['name']} {outage['grid_element_id']} successfully disconnected") + merge_log.update({'outages_corrected': True}) + merge_log.get('outages_applied').extend([{'name': outage['name'], 'grid_id': outage['grid_element_id'], "eic": outage['eic']}]) + else: + if uap_outages['grid_element_id'].str.contains(outage['grid_element_id']).any(): + logger.info(f"Line {outage['name']} {outage['grid_element_id']} is already in outage") + else: + logger.error(f"Failed to disconnect line: {outage['name']} {outage['grid_element_id']}") + except Exception as e: + logger.error((e, outage['name'])) + merge_log.get('outages_unmapped').extend([{'name': outage['name'], 'grid_id': outage['grid_element_id'], "eic": outage['eic']}]) + continue + + return merged_model, merge_log From 950c4c865628beb1015cc0786f20b907184f6548 Mon Sep 17 00:00:00 2001 From: "gintautas.poderys" Date: Tue, 7 Jan 2025 18:07:15 +0200 Subject: [PATCH 2/3] updated outage crosscheck to include outage reconnection --- emf/loadflow_tool/helper.py | 15 ++-- .../model_merger/model_merger.py | 4 +- .../model_merger/temporary_fixes.py | 69 ++++++++++++++----- 3 files changed, 63 insertions(+), 25 deletions(-) diff --git a/emf/loadflow_tool/helper.py b/emf/loadflow_tool/helper.py index 28915c4..33888ee 100644 --- a/emf/loadflow_tool/helper.py +++ b/emf/loadflow_tool/helper.py @@ -504,11 +504,18 @@ def export_model(network: pypowsybl.network, opdm_object_meta, profiles=None): def get_model_outages(network: pypowsybl.network): + outage_log = [] - lines = network.get_lines().reset_index(names=['grid_id']) + lines = network.get_elements(element_type=pypowsybl.network.ElementType.LINE, all_attributes=True).reset_index(names=['grid_id']) + _voltage_levels = network.get_voltage_levels(all_attributes=True).rename(columns={"name": "voltage_level_name"}) + _substations = network.get_substations(all_attributes=True).rename(columns={"name": "substation_name"}) + lines = lines.merge(_voltage_levels, left_on='voltage_level1_id', right_index=True, suffixes=(None, '_voltage_level')) + lines = lines.merge(_substations, left_on='substation_id', right_index=True, suffixes=(None, '_substation')) lines['element_type'] = 'Line' + dlines = get_network_elements(network, pypowsybl.network.ElementType.DANGLING_LINE).reset_index(names=['grid_id']) dlines['element_type'] = 'Tieline' + gens = get_network_elements(network, pypowsybl.network.ElementType.GENERATOR).reset_index(names=['grid_id']) gens['element_type'] = 'Generator' @@ -516,8 +523,8 @@ def get_model_outages(network: pypowsybl.network): disconnected_dlines = dlines[dlines['connected'] == False] disconnected_gens = gens[gens['connected'] == False] - outage_log.extend(disconnected_lines[['grid_id', 'name', 'element_type']].to_dict('records')) - outage_log.extend(disconnected_dlines[['grid_id', 'name', 'element_type']].to_dict('records')) - outage_log.extend(disconnected_gens[['grid_id', 'name', 'element_type']].to_dict('records')) + outage_log.extend(disconnected_lines[['grid_id', 'name', 'element_type', 'country']].to_dict('records')) + outage_log.extend(disconnected_dlines[['grid_id', 'name', 'element_type', 'country']].to_dict('records')) + outage_log.extend(disconnected_gens[['grid_id', 'name', 'element_type', 'country']].to_dict('records')) return outage_log diff --git a/emf/loadflow_tool/model_merger/model_merger.py b/emf/loadflow_tool/model_merger/model_merger.py index 3fd2241..b3bc848 100644 --- a/emf/loadflow_tool/model_merger/model_merger.py +++ b/emf/loadflow_tool/model_merger/model_merger.py @@ -74,7 +74,7 @@ def handle(self, task_object: dict, **kwargs): "replaced_entity": [], "replacement_reason": [], "outages_corrected": False, - "outages_applied": [], + "outage_fixes": [], "outages_unmapped": []} # Parse relevant data from Task @@ -229,7 +229,7 @@ def handle(self, task_object: dict, **kwargs): replaced_tso_list = [model['tso'] for model in merge_log['replaced_entity']] if any(tso in ['LITGRID', 'AST', 'ELERING'] for tso in replaced_tso_list): - merged_model, merge_log = fix_model_outages(merged_model, replaced_tso_list, merge_log, scenario_datetime) + merged_model, merge_log = fix_model_outages(merged_model, replaced_tso_list, merge_log, scenario_datetime, time_horizon) # Various fixes from igmsshvscgmssh error if remove_non_generators_from_slack_participation: diff --git a/emf/loadflow_tool/model_merger/temporary_fixes.py b/emf/loadflow_tool/model_merger/temporary_fixes.py index 257a397..46e7ac7 100644 --- a/emf/loadflow_tool/model_merger/temporary_fixes.py +++ b/emf/loadflow_tool/model_merger/temporary_fixes.py @@ -1,7 +1,9 @@ import triplets +import pandas as pd +import logging from emf.common.integrations import elastic -from emf.loadflow_tool.helper import create_opdm_objects +from emf.loadflow_tool.helper import create_opdm_objects, get_model_outages from emf.loadflow_tool.model_merger.merge_functions import (load_opdm_data, create_sv_and_updated_ssh, fix_sv_shunts, fix_sv_tapsteps, remove_duplicate_sv_voltages, remove_small_islands,check_and_fix_dependencies, @@ -9,7 +11,9 @@ export_to_cgmes_zip, set_brell_lines_to_zero_in_models, configure_paired_boundarypoint_injections_by_nodes, set_brell_lines_to_zero_in_models_new) -from emf.loadflow_tool.model_merger.model_merger import logger + + +logger = logging.getLogger(__name__) def run_pre_merge_processing(input_models, merging_area): @@ -65,7 +69,7 @@ def run_post_merge_processing(input_models, solved_model, task_properties, SMALL return sv_data, ssh_data -def fix_model_outages(merged_model, replaced_model_list: list, merge_log, scenario_datetime): +def fix_model_outages(merged_model, replaced_model_list: list, merge_log, scenario_datetime, time_horizon): area_map = {"LITGRID": "Lithuania", "AST": "Latvia", "ELERING": "Estonia"} outage_areas = [area_map.get(item, item) for item in replaced_model_list] @@ -77,45 +81,72 @@ def fix_model_outages(merged_model, replaced_model_list: list, merge_log, scenar mrid_map['mrid'] = mrid_map['mrid'].str.lstrip('_') # Get latest UAP parse date - body = {"size": 1, "query": {"bool": {"must": [{"match": {"Merge": "Week"}}]}}, + if time_horizon == 'MO': + merge_type = "Month" + else: + merge_type = "Week" + + body = {"size": 1, "query": {"bool": {"must": [{"match": {"Merge": merge_type}}]}}, "sort": [{"reportParsedDate": {"order": "desc"}}], "fields": ["reportParsedDate"]} last_uap_version = elk_service.client.search(index='opc-outages-baltics*', body=body)['hits']['hits'][0]['fields']['reportParsedDate'][0] # Query for latest outage UAP uap_query = {"bool": {"must": [{"match": {"reportParsedDate": f"{last_uap_version}"}}, - {"match": {"Merge": "Week"}}]}} + {"match": {"Merge": merge_type}}]}} uap_outages = elk_service.get_docs_by_query(index='opc-outages-baltics*', query=uap_query, size=10000) - uap_outages = uap_outages.merge(mrid_map[['eic', 'mrid']], how='left', on='eic').rename(columns={"mrid": 'grid_element_id'}) + uap_outages = uap_outages.merge(mrid_map[['eic', 'mrid']], how='left', on='eic').rename(columns={"mrid": 'grid_id'}) # Filter outages according to model scenario date and replaced area filtered_outages = uap_outages[(uap_outages['start_date'] <= scenario_datetime) & (uap_outages['end_date'] >= scenario_datetime)] filtered_outages = filtered_outages[filtered_outages['Area'].isin(outage_areas)] - mapped_outages = filtered_outages[~filtered_outages['grid_element_id'].isna()] - missing_outages = filtered_outages[filtered_outages['grid_element_id'].isna()] + mapped_outages = filtered_outages[~filtered_outages['grid_id'].isna()] + missing_outages = filtered_outages[filtered_outages['grid_id'].isna()] if not missing_outages.empty: logger.warning(f"Missing outage mRID(s): {missing_outages['name'].values}") - # TODO find reliable way to enable incorrectly disconnected outages - # model_outages = pd.DataFrame(get_model_outages(merged_model['network'])) - # mapped = pd.merge(model_outages, mrid_map, left_on='grid_id', right_on='mrid', how='inner') + # Get outages already applied to the model + model_outages = pd.DataFrame(get_model_outages(merged_model['network'])) + mapped_model_outages = pd.merge(model_outages, mrid_map, left_on='grid_id', right_on='mrid', how='inner') + model_area_map = {"LITGRID": "LT", "AST": "LV", "ELERING": "EE"} + model_outage_areas = [model_area_map.get(item, item) for item in replaced_model_list] + filtered_model_outages = mapped_model_outages[mapped_model_outages['country'].isin(model_outage_areas)] + + logger.info("Fixing outages inside merged model:") + + # Reconnecting outages from network-config list + for index, outage in filtered_model_outages.iterrows(): + try: + if merged_model['network'].connect(outage['grid_id']): + logger.info(f" {outage['name']} {outage['grid_id']} successfully reconnected") + merge_log.update({'outages_corrected': True}) + merge_log.get('outage_fixes').extend([{'name': outage['name'], 'grid_id': outage['grid_id'], "eic": outage['eic'], "outage_status": "connected"}]) + else: + if uap_outages['grid_id'].str.contains(outage['grid_id']).any(): + logger.info(f"{outage['name']} {outage['grid_id']} is already connected") + else: + logger.error(f"Failed to connect outage: {outage['name']} {outage['grid_id']}") + except Exception as e: + logger.error((e, outage['name'])) + merge_log.get('outages_unmapped').extend([{'name': outage['name'], 'grid_id': outage['grid_id'], "eic": outage['eic']}]) + continue - logger.info("Checking outages inside merged model:") + # Applying outages from UAP for index, outage in mapped_outages.iterrows(): try: - if merged_model['network'].disconnect(outage['grid_element_id']): - logger.info(f"Line {outage['name']} {outage['grid_element_id']} successfully disconnected") + if merged_model['network'].disconnect(outage['grid_id']): + logger.info(f"{outage['name']} {outage['grid_id']} successfully disconnected") merge_log.update({'outages_corrected': True}) - merge_log.get('outages_applied').extend([{'name': outage['name'], 'grid_id': outage['grid_element_id'], "eic": outage['eic']}]) + merge_log.get('outage_fixes').extend([{'name': outage['name'], 'grid_id': outage['grid_id'], "eic": outage['eic'], "outage_status": "disconnected"}]) else: - if uap_outages['grid_element_id'].str.contains(outage['grid_element_id']).any(): - logger.info(f"Line {outage['name']} {outage['grid_element_id']} is already in outage") + if uap_outages['grid_id'].str.contains(outage['grid_id']).any(): + logger.info(f"{outage['name']} {outage['grid_id']} is already in outage") else: - logger.error(f"Failed to disconnect line: {outage['name']} {outage['grid_element_id']}") + logger.error(f"Failed to disconnect outage: {outage['name']} {outage['grid_id']}") except Exception as e: logger.error((e, outage['name'])) - merge_log.get('outages_unmapped').extend([{'name': outage['name'], 'grid_id': outage['grid_element_id'], "eic": outage['eic']}]) + merge_log.get('outages_unmapped').extend([{'name': outage['name'], 'grid_id': outage['grid_id'], "eic": outage['eic']}]) continue return merged_model, merge_log From d10337710102bb27d5eb2e804b8d97aca40340d9 Mon Sep 17 00:00:00 2001 From: "gintautas.poderys" Date: Wed, 8 Jan 2025 10:37:12 +0200 Subject: [PATCH 3/3] set outage fixing to for BA merges only --- emf/loadflow_tool/model_merger/model_merger.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/emf/loadflow_tool/model_merger/model_merger.py b/emf/loadflow_tool/model_merger/model_merger.py index b3bc848..4658677 100644 --- a/emf/loadflow_tool/model_merger/model_merger.py +++ b/emf/loadflow_tool/model_merger/model_merger.py @@ -228,7 +228,7 @@ def handle(self, task_object: dict, **kwargs): # Crosscheck replaced model outages with latest UAP if atleast one baltic model was replaced replaced_tso_list = [model['tso'] for model in merge_log['replaced_entity']] - if any(tso in ['LITGRID', 'AST', 'ELERING'] for tso in replaced_tso_list): + if merging_area == 'BA' and any(tso in ['LITGRID', 'AST', 'ELERING'] for tso in replaced_tso_list): merged_model, merge_log = fix_model_outages(merged_model, replaced_tso_list, merge_log, scenario_datetime, time_horizon) # Various fixes from igmsshvscgmssh error @@ -439,13 +439,13 @@ def handle(self, task_object: dict, **kwargs): "job_period_start": "2024-05-24T22:00:00+00:00", "job_period_end": "2024-05-25T06:00:00+00:00", "task_properties": { - "timestamp_utc": "2024-12-16T08:30:00+00:00", - "merge_type": "EU", + "timestamp_utc": "2025-01-06T08:30:00+00:00", + "merge_type": "BA", "merging_entity": "BALTICRCC", "included": ['PSE', 'AST', 'ELERING'], "excluded": [], "local_import": ['LITGRID'], - "time_horizon": "1D", + "time_horizon": "2D", "version": "99", "mas": "http://www.baltic-rsc.eu/OperationalPlanning", "pre_temp_fixes": "True",