diff --git a/fabric_cf/__init__.py b/fabric_cf/__init__.py index 5f2ae615..96e7bfe8 100644 --- a/fabric_cf/__init__.py +++ b/fabric_cf/__init__.py @@ -1,2 +1,2 @@ -__version__ = "1.7.0b1" +__version__ = "1.7.0rc0" __VERSION__ = __version__ diff --git a/fabric_cf/actor/core/apis/abc_actor_management_object.py b/fabric_cf/actor/core/apis/abc_actor_management_object.py index 80798567..9e8a807f 100644 --- a/fabric_cf/actor/core/apis/abc_actor_management_object.py +++ b/fabric_cf/actor/core/apis/abc_actor_management_object.py @@ -238,7 +238,8 @@ def get_sites(self, *, caller: AuthToken, site: str) -> ResultSitesAvro: def get_reservations(self, *, caller: AuthToken, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, type: str = None, - site: str = None, node_id: str = None) -> ResultReservationAvro: + site: str = None, node_id: str = None, + host: str = None, ip_subnet: str = None) -> ResultReservationAvro: """ Get Reservations @param states states @@ -252,6 +253,9 @@ def get_reservations(self, *, caller: AuthToken, states: List[int] = None, @param node_id node id Obtains all reservations with error information in case of failure @param caller caller + @param host host + @param ip_subnet ip subnet + @return returns list of the reservations """ diff --git a/fabric_cf/actor/core/apis/abc_actor_mixin.py b/fabric_cf/actor/core/apis/abc_actor_mixin.py index f6183c64..68453c72 100644 --- a/fabric_cf/actor/core/apis/abc_actor_mixin.py +++ b/fabric_cf/actor/core/apis/abc_actor_mixin.py @@ -38,6 +38,7 @@ from fabric_cf.actor.core.apis.abc_tick import ABCTick if TYPE_CHECKING: + from fabric_cf.actor.core.kernel.slice_state_machine import SliceState from fabric_cf.actor.core.apis.abc_actor_event import ABCActorEvent from fabric_cf.actor.core.apis.abc_actor_proxy import ABCActorProxy from fabric_cf.actor.core.apis.abc_base_plugin import ABCBasePlugin @@ -431,12 +432,13 @@ def register_slice(self, *, slice_object: ABCSlice): """ @abstractmethod - def modify_slice(self, *, slice_object: ABCSlice): + def modify_slice(self, *, slice_object: ABCSlice, new_state: SliceState): """ Modify the slice registered with the actor. Moves the slice into Modifying State Args: slice_object: slice_object + new_state: new_state Raises: Exception in case of error """ diff --git a/fabric_cf/actor/core/apis/abc_database.py b/fabric_cf/actor/core/apis/abc_database.py index 0d64f244..fd6a0245 100644 --- a/fabric_cf/actor/core/apis/abc_database.py +++ b/fabric_cf/actor/core/apis/abc_database.py @@ -161,7 +161,7 @@ def update_slice(self, *, slice_object: ABCSlice): def get_reservations(self, *, slice_id: ID = None, graph_node_id: str = None, project_id: str = None, email: str = None, oidc_sub: str = None, rid: ID = None, states: list[int] = None, site: str = None, rsv_type: list[str] = None, start: datetime = None, - end: datetime = None) -> List[ABCReservationMixin]: + end: datetime = None, ip_subnet: str = None, host: str = None) -> List[ABCReservationMixin]: """ Retrieves the reservations. @@ -172,7 +172,8 @@ def get_reservations(self, *, slice_id: ID = None, graph_node_id: str = None, pr @abstractmethod def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None, - bdf: str = None, start: datetime = None, end: datetime = None) -> Dict[str, List[str]]: + bdf: str = None, start: datetime = None, end: datetime = None, + excludes: List[str] = None) -> Dict[str, List[str]]: """ Returns components matching the search criteria @param node_id: Worker Node ID to which components belong @@ -182,6 +183,7 @@ def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str] @param bdf: Component's PCI address @param start: start time @param end: end time + @param excludes: Excludes the list of reservations NOTE# For P4 switches; node_id=node+renc-p4-sw component=ip+192.168.11.8 bdf=p1 diff --git a/fabric_cf/actor/core/apis/abc_mgmt_actor.py b/fabric_cf/actor/core/apis/abc_mgmt_actor.py index 33b7012b..daf17a32 100644 --- a/fabric_cf/actor/core/apis/abc_mgmt_actor.py +++ b/fabric_cf/actor/core/apis/abc_mgmt_actor.py @@ -29,7 +29,6 @@ from typing import TYPE_CHECKING, List, Tuple, Dict from fabric_mb.message_bus.messages.delegation_avro import DelegationAvro -from fabric_mb.message_bus.messages.poa_avro import PoaAvro from fabric_mb.message_bus.messages.poa_info_avro import PoaInfoAvro from fabric_mb.message_bus.messages.site_avro import SiteAvro @@ -151,7 +150,8 @@ def accept_update_slice(self, *, slice_id: ID) -> bool: @abstractmethod def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, - type: str = None, site: str = None, node_id: str = None) -> List[ReservationMng]: + type: str = None, site: str = None, node_id: str = None, + host: str = None, ip_subnet: str = None) -> List[ReservationMng]: """ Get Reservations @param states states @@ -163,6 +163,8 @@ def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, @param type type of reservations like NodeSliver/NetworkServiceSliver @param site site @param node_id node id + @param ip_subnet ip subnet + @param host host Obtains all reservations @return returns list of the reservations """ diff --git a/fabric_cf/actor/core/apis/abc_reservation_mixin.py b/fabric_cf/actor/core/apis/abc_reservation_mixin.py index 72a55180..04a2a480 100644 --- a/fabric_cf/actor/core/apis/abc_reservation_mixin.py +++ b/fabric_cf/actor/core/apis/abc_reservation_mixin.py @@ -29,6 +29,8 @@ from enum import Enum from typing import TYPE_CHECKING +from fabric_cf.actor.core.time.term import Term + from fabric_cf.actor.core.apis.abc_reservation_resources import ABCReservationResources from fabric_cf.actor.core.apis.abc_reservation_status import ABCReservationStatus @@ -541,5 +543,13 @@ def poa_info(self, *, incoming: Poa): """ Process POA response + @throws Exception in case of error + """ + + @abstractmethod + def get_term(self) -> Term: + """ + Return Tem + @throws Exception in case of error """ \ No newline at end of file diff --git a/fabric_cf/actor/core/common/event_logger.py b/fabric_cf/actor/core/common/event_logger.py index 79047fc7..9908fda0 100644 --- a/fabric_cf/actor/core/common/event_logger.py +++ b/fabric_cf/actor/core/common/event_logger.py @@ -29,6 +29,7 @@ from fabric_mb.message_bus.messages.slice_avro import SliceAvro from fim.logging.log_collector import LogCollector from fim.slivers.base_sliver import BaseSliver +from fim.slivers.network_node import NodeSliver from fim.user.topology import ExperimentTopology from fabric_cf.actor.core.common.constants import Constants @@ -98,7 +99,7 @@ def log_sliver_event(self, *, slice_object: SliceAvro, sliver: BaseSliver, verb: owner = slice_object.get_owner() log_message = f"CFEL Sliver event slc:{slice_object.get_slice_id()} " \ - f"slvr:{sliver.get_reservation_info().reservation_id} of " \ + f"slvr:{sliver.get_reservation_info().reservation_id}/{sliver.get_name()} of " \ f"type {sliver.get_type()} {verb} " \ f"by prj:{slice_object.get_project_id()} usr:{owner.get_oidc_sub_claim()}" \ f":{owner.get_email()}" @@ -112,6 +113,9 @@ def log_sliver_event(self, *, slice_object: SliceAvro, sliver: BaseSliver, verb: log_message += f" {str(lc)}" + if isinstance(sliver, NodeSliver) and sliver.get_image_ref(): + log_message += f" image:{sliver.get_image_ref()}" + self.logger.info(log_message) except Exception as e: traceback.print_exc() diff --git a/fabric_cf/actor/core/core/actor.py b/fabric_cf/actor/core/core/actor.py index 414f9d26..95a79702 100644 --- a/fabric_cf/actor/core/core/actor.py +++ b/fabric_cf/actor/core/core/actor.py @@ -658,8 +658,8 @@ def register(self, *, reservation: ABCReservationMixin): def register_slice(self, *, slice_object: ABCSlice): self.wrapper.register_slice(slice_object=slice_object) - def modify_slice(self, *, slice_object: ABCSlice): - self.wrapper.modify_slice(slice_object=slice_object) + def modify_slice(self, *, slice_object: ABCSlice, new_state: SliceState): + self.wrapper.modify_slice(slice_object=slice_object, new_state=new_state) def delete_slice(self, *, slice_id: ID): self.wrapper.delete_slice(slice_id=slice_id) diff --git a/fabric_cf/actor/core/kernel/broker_reservation.py b/fabric_cf/actor/core/kernel/broker_reservation.py index 24976d7b..c54538d8 100644 --- a/fabric_cf/actor/core/kernel/broker_reservation.py +++ b/fabric_cf/actor/core/kernel/broker_reservation.py @@ -319,6 +319,8 @@ def probe_pending(self): self.transition(prefix="Recover from Extend Failure", state=ReservationStates.Ticketed, pending=ReservationPendingStates.None_) self.extend_failure = False + self.update_data.clear(clear_fail=True) + self.error_message = "" else: if self.pending_state == ReservationPendingStates.Ticketing: # Check for a pending ticket operation that may have completed @@ -559,7 +561,9 @@ def handle_failed_rpc(self, *, failed: FailedRPC): super().handle_failed_rpc(failed=failed) def fail_extend(self, *, message: str, exception: Exception = None): + self.logger.debug(f"Failed Extend: {message}") self.extend_failure = True + self.notified_failed = False super().fail(message=message, exception=exception) diff --git a/fabric_cf/actor/core/kernel/kernel.py b/fabric_cf/actor/core/kernel/kernel.py index 8cd184e2..4d15883e 100644 --- a/fabric_cf/actor/core/kernel/kernel.py +++ b/fabric_cf/actor/core/kernel/kernel.py @@ -872,10 +872,11 @@ def register_reservation(self, *, reservation: ABCReservationMixin): finally: reservation.unlock() - def modify_slice(self, *, slice_object: ABCSlice): + def modify_slice(self, *, slice_object: ABCSlice, new_state: SliceState): """ Modify the specified slice with the kernel. @param slice_object slice_object + @param new_state new_state @throws Exception in case of failure """ if slice_object is None: @@ -891,7 +892,11 @@ def modify_slice(self, *, slice_object: ABCSlice): if not real.is_dead_or_closing(): real.set_config_properties(value=slice_object.get_config_properties()) # Transition slice to Configuring state - real.transition_slice(operation=SliceStateMachine.MODIFY) + if new_state == SliceState.Modifying: + operation = SliceStateMachine.MODIFY + else: + operation = SliceStateMachine.RENEW + real.transition_slice(operation=operation) real.set_graph_id(graph_id=slice_object.get_graph_id()) real.set_dirty() self.plugin.get_database().update_slice(slice_object=real) diff --git a/fabric_cf/actor/core/kernel/kernel_wrapper.py b/fabric_cf/actor/core/kernel/kernel_wrapper.py index 86c8166b..ca01f761 100644 --- a/fabric_cf/actor/core/kernel/kernel_wrapper.py +++ b/fabric_cf/actor/core/kernel/kernel_wrapper.py @@ -28,6 +28,7 @@ from datetime import datetime from typing import List, Dict +from fabric_cf.actor.core.kernel.slice_state_machine import SliceState from fabric_mb.message_bus.messages.poa_info_avro import PoaInfoAvro from fim.slivers.base_sliver import BaseSliver @@ -769,16 +770,17 @@ def register_delegation(self, *, delegation: ABCDelegation): self.kernel.register_delegation(delegation=delegation) - def modify_slice(self, *, slice_object: ABCSlice): + def modify_slice(self, *, slice_object: ABCSlice, new_state: SliceState): """ Modify the slice registered with the kernel @param slice_object slice_object + @param new_state new_state @throws Exception in case of error """ if slice_object is None or slice_object.get_slice_id() is None or not isinstance(slice_object, ABCSlice): raise KernelException("Invalid argument {}".format(slice_object)) - self.kernel.modify_slice(slice_object=slice_object) + self.kernel.modify_slice(slice_object=slice_object, new_state=new_state) def delete_slice(self, *, slice_id: ID): """ diff --git a/fabric_cf/actor/core/kernel/reservation_client.py b/fabric_cf/actor/core/kernel/reservation_client.py index 060f6ed7..7e0769a9 100644 --- a/fabric_cf/actor/core/kernel/reservation_client.py +++ b/fabric_cf/actor/core/kernel/reservation_client.py @@ -305,6 +305,12 @@ def absorb_ticket_update(self, *, incoming: ABCReservationMixin, update_data: Up self.resources.update(reservation=self, resource_set=incoming.get_resources()) self.logger.debug("absorb_update: {}".format(incoming)) + # Clear error message from previous Extend operations + self.error_message = "" + + #if self.resources.get_sliver().reservation_info: + # self.resources.get_sliver().reservation_info.error_message = self.error_message + self.policy.update_ticket_complete(reservation=self) def accept_lease_update(self, *, incoming: ABCReservationMixin, update_data: UpdateData) -> bool: @@ -623,9 +629,9 @@ def can_renew(self) -> bool: return self.last_ticket_update.successful() - def clear_notice(self, clear_fail: bool=False): - self.last_ticket_update.clear() - self.last_lease_update.clear() + def clear_notice(self, clear_fail: bool = False): + self.last_ticket_update.clear(clear_fail=clear_fail) + self.last_lease_update.clear(clear_fail=clear_fail) def do_relinquish(self): """ @@ -880,9 +886,11 @@ def get_last_ticket_update(self) -> str: if self.last_ticket_update is not None: if self.last_ticket_update.get_message() is not None and self.last_ticket_update.get_message() != "": result += f"{self.last_ticket_update.get_message()}, " - ev = self.last_ticket_update.get_events() - if ev is not None and ev != "": - result += f"events: {ev}, " + # Include events only in case of failure + if self.last_ticket_update.is_failed(): + ev = self.last_ticket_update.get_events() + if ev is not None and ev != "": + result += f"events: {ev}, " result = result[:-2] return result @@ -891,9 +899,11 @@ def get_last_lease_update(self) -> str: if self.last_lease_update is not None: if self.last_lease_update.get_message() is not None and self.last_lease_update.get_message() != "": result += f"{self.last_lease_update.get_message()}, " - ev = self.last_lease_update.get_events() - if ev is not None and ev != "": - result += f"events: {ev}, " + # Include events only in case of failure + if self.last_lease_update.is_failed(): + ev = self.last_lease_update.get_events() + if ev is not None and ev != "": + result += f"events: {ev}, " result = result[:-2] return result diff --git a/fabric_cf/actor/core/kernel/reservation_states.py b/fabric_cf/actor/core/kernel/reservation_states.py index 5d471fb5..a2aa8b7c 100644 --- a/fabric_cf/actor/core/kernel/reservation_states.py +++ b/fabric_cf/actor/core/kernel/reservation_states.py @@ -23,9 +23,22 @@ # # # Author: Komal Thareja (kthare10@renci.org) +import enum from enum import Enum +class ReservationOperation(enum.Enum): + Create = enum.auto(), + Modify = enum.auto(), + Extend = enum.auto() + + def __repr__(self): + return self.name + + def __str__(self): + return self.name + + class ReservationStates(Enum): """ Reservation states @@ -63,7 +76,7 @@ def translate(state_name: str): elif state_name.lower() == ReservationStates.Failed.name.lower(): return ReservationStates.Failed elif state_name.lower() == ReservationStates.CloseFail.name.lower(): - return ReservationStates.Failed + return ReservationStates.CloseFail else: return ReservationStates.Unknown diff --git a/fabric_cf/actor/core/kernel/slice.py b/fabric_cf/actor/core/kernel/slice.py index c9d92356..593dda76 100644 --- a/fabric_cf/actor/core/kernel/slice.py +++ b/fabric_cf/actor/core/kernel/slice.py @@ -268,7 +268,18 @@ def is_dirty(self) -> bool: return self.dirty def transition_slice(self, *, operation: SliceOperation) -> Tuple[bool, SliceState]: - return self.state_machine.transition_slice(operation=operation, reservations=self.reservations) + status, new_state = self.state_machine.transition_slice(operation=operation, reservations=self.reservations) + if status and new_state in [SliceState.StableOK, SliceState.StableError]: + new_end = None + for r in self.reservations.values(): + term = r.get_term() + if not new_end or (term and term.get_end_time() > new_end): + new_end = term.get_end_time() + + if new_end: + self.lease_end = new_end + + return status, new_state def is_stable_ok(self) -> bool: state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE) diff --git a/fabric_cf/actor/core/kernel/slice_state_machine.py b/fabric_cf/actor/core/kernel/slice_state_machine.py index de19811a..fa2d7e70 100644 --- a/fabric_cf/actor/core/kernel/slice_state_machine.py +++ b/fabric_cf/actor/core/kernel/slice_state_machine.py @@ -139,6 +139,7 @@ def is_modified(*, state) -> bool: class SliceCommand(Enum): Create = enum.auto() Modify = enum.auto() + Renew = enum.auto() Delete = enum.auto() Reevaluate = enum.auto() ModifyAccept = enum.auto() @@ -198,6 +199,9 @@ class SliceStateMachine: MODIFY = SliceOperation(SliceCommand.Modify, SliceState.StableOK, SliceState.StableError, SliceState.Configuring, SliceState.AllocatedOK, SliceState.AllocatedError) + RENEW = SliceOperation(SliceCommand.Renew, SliceState.StableOK, SliceState.StableError, SliceState.AllocatedOK, + SliceState.ModifyOK, SliceState.ModifyError, SliceState.AllocatedError) + MODIFY_ACCEPT = SliceOperation(SliceCommand.ModifyAccept, SliceState.ModifyOK, SliceState.ModifyError, SliceState.Modifying, SliceState.AllocatedOK, SliceState.AllocatedError) @@ -247,6 +251,9 @@ def transition_slice(self, *, operation: SliceOperation, reservations: Reservati if operation.command == SliceCommand.Create: self.state = SliceState.Configuring + elif operation.command == SliceCommand.Renew: + self.state = SliceState.Configuring + elif operation.command == SliceCommand.Modify: self.state = SliceState.Modifying diff --git a/fabric_cf/actor/core/manage/actor_management_object.py b/fabric_cf/actor/core/manage/actor_management_object.py index f6e208c4..8f304837 100644 --- a/fabric_cf/actor/core/manage/actor_management_object.py +++ b/fabric_cf/actor/core/manage/actor_management_object.py @@ -347,7 +347,8 @@ def run(self): try: if modify_state: slice_object = Translate.translate_slice(slice_avro=slice_mng) - self.actor.modify_slice(slice_object=slice_object) + from fabric_cf.actor.core.kernel.slice_state_machine import SliceState + self.actor.modify_slice(slice_object=slice_object, new_state=SliceState(slice_mng.get_state())) else: slice_obj = self.actor.get_slice(slice_id=slice_id) if slice_obj is None: @@ -429,7 +430,8 @@ def get_sites(self, *, caller: AuthToken, site: str) -> ResultSitesAvro: def get_reservations(self, *, caller: AuthToken, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, type: str = None, - site: str = None, node_id: str = None) -> ResultReservationAvro: + site: str = None, node_id: str = None, host: str = None, + ip_subnet: str = None) -> ResultReservationAvro: result = ResultReservationAvro() result.status = ResultAvro() @@ -449,7 +451,7 @@ def get_reservations(self, *, caller: AuthToken, states: List[int] = None, else: res_list = self.db.get_reservations(slice_id=slice_id, rid=rid, email=email, states=states, rsv_type=rsv_type, site=site, - graph_node_id=node_id) + graph_node_id=node_id, host=host, ip_subnet=ip_subnet) except Exception as e: self.logger.error("getReservations:db access {}".format(e)) result.status.set_code(ErrorCodes.ErrorDatabaseError.value) diff --git a/fabric_cf/actor/core/manage/client_actor_management_object_helper.py b/fabric_cf/actor/core/manage/client_actor_management_object_helper.py index c40b327c..662b5608 100644 --- a/fabric_cf/actor/core/manage/client_actor_management_object_helper.py +++ b/fabric_cf/actor/core/manage/client_actor_management_object_helper.py @@ -462,13 +462,16 @@ def run(self): dependencies=redeem_dep_res_list) return result - + ''' # Process Extend for Renew synchronously if new_end_time is not None: result = self.client.execute_on_actor_thread_and_wait(runnable=Runner(actor=self.client)) # Process Extend for Modify asynchronously else: self.client.execute_on_actor_thread(runnable=Runner(actor=self.client)) + ''' + # Always Process Extend asynchronously + self.client.execute_on_actor_thread(runnable=Runner(actor=self.client)) except Exception as e: self.logger.error("extend_reservation {}".format(e)) diff --git a/fabric_cf/actor/core/manage/kafka/kafka_actor.py b/fabric_cf/actor/core/manage/kafka/kafka_actor.py index 54119ded..129d7ee5 100644 --- a/fabric_cf/actor/core/manage/kafka/kafka_actor.py +++ b/fabric_cf/actor/core/manage/kafka/kafka_actor.py @@ -33,7 +33,6 @@ from fabric_mb.message_bus.messages.get_delegations_avro import GetDelegationsAvro from fabric_mb.message_bus.messages.get_sites_request_avro import GetSitesRequestAvro from fabric_mb.message_bus.messages.maintenance_request_avro import MaintenanceRequestAvro -from fabric_mb.message_bus.messages.poa_avro import PoaAvro from fabric_mb.message_bus.messages.poa_info_avro import PoaInfoAvro from fabric_mb.message_bus.messages.remove_delegation_avro import RemoveDelegationAvro from fabric_mb.message_bus.messages.reservation_state_avro import ReservationStateAvro @@ -132,11 +131,12 @@ def delete_slice(self, *, slice_id: ID) -> bool: def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, - type: str = None, site: str = None, node_id: str = None) -> List[ReservationMng]: + type: str = None, site: str = None, node_id: str = None, + host: str = None, ip_subnet: str = None) -> List[ReservationMng]: request = GetReservationsRequestAvro() request = self.fill_request_by_id_message(request=request, slice_id=slice_id, states=states, email=email, rid=rid, - type=type, site=site) + type=type, site=site, host=host, ip_subnet=ip_subnet) status, response = self.send_request(request) if status.code == 0: diff --git a/fabric_cf/actor/core/manage/kafka/kafka_proxy.py b/fabric_cf/actor/core/manage/kafka/kafka_proxy.py index c6bc9501..a45c76cc 100644 --- a/fabric_cf/actor/core/manage/kafka/kafka_proxy.py +++ b/fabric_cf/actor/core/manage/kafka/kafka_proxy.py @@ -93,7 +93,7 @@ def get_type_id(self) -> str: def fill_request_by_id_message(self, request: RequestByIdRecord, id_token: str = None, email: str = None, slice_id: ID = None, slice_name: str = None, states: List[int] = None, rid: ID = None, delegation_id: str = None, broker_id: ID = None, type: str = None, - site: str = None): + site: str = None, host: str = None, ip_subnet: str = None): request.guid = str(self.management_id) request.auth = self.auth request.callback_topic = self.callback_topic @@ -104,6 +104,8 @@ def fill_request_by_id_message(self, request: RequestByIdRecord, id_token: str = request.delegation_id = delegation_id request.site = site request.type = type + request.ip_subnet = ip_subnet + request.host = host if slice_id is not None: request.slice_id = str(slice_id) if rid is not None: diff --git a/fabric_cf/actor/core/manage/local/local_actor.py b/fabric_cf/actor/core/manage/local/local_actor.py index cdeea385..ae80b11e 100644 --- a/fabric_cf/actor/core/manage/local/local_actor.py +++ b/fabric_cf/actor/core/manage/local/local_actor.py @@ -110,12 +110,14 @@ def remove_slice(self, *, slice_id: ID) -> bool: def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, - type: str = None, site: str = None, node_id: str = None) -> List[ReservationMng]: + type: str = None, site: str = None, node_id: str = None, + host: str = None, ip_subnet: str = None) -> List[ReservationMng]: self.clear_last() try: result = self.manager.get_reservations(caller=self.auth, states=states, slice_id=slice_id, rid=rid, oidc_claim_sub=oidc_claim_sub, email=email, rid_list=rid_list, - type=type, site=site, node_id=node_id) + type=type, site=site, node_id=node_id, host=host, + ip_subnet=ip_subnet) self.last_status = result.status if result.status.get_code() == 0: diff --git a/fabric_cf/actor/core/plugins/db/actor_database.py b/fabric_cf/actor/core/plugins/db/actor_database.py index 49c4c435..d2e5604a 100644 --- a/fabric_cf/actor/core/plugins/db/actor_database.py +++ b/fabric_cf/actor/core/plugins/db/actor_database.py @@ -285,12 +285,19 @@ def add_reservation(self, *, reservation: ABCReservationMixin): site = None rsv_type = None components = None + host = None + ip_subnet = None if reservation.get_resources() is not None and reservation.get_resources().get_sliver() is not None: sliver = reservation.get_resources().get_sliver() site = sliver.get_site() rsv_type = sliver.get_type().name from fim.slivers.network_service import NetworkServiceSliver + from fim.slivers.network_node import NodeSliver + if isinstance(sliver, NetworkServiceSliver) and sliver.interface_info: + if sliver.get_gateway(): + ip_subnet = sliver.get_gateway().subnet + components = [] for interface in sliver.interface_info.interfaces.values(): graph_id_node_id_component_id, bqm_if_name = interface.get_node_map() @@ -304,6 +311,26 @@ def add_reservation(self, *, reservation: ABCReservationMixin): bdf = ":".join(split_string[3:]) if len(split_string) > 3 else None if node_id and comp_id and bdf: components.append((node_id, comp_id, bdf)) + elif isinstance(sliver, NodeSliver) and sliver.attached_components_info: + if sliver.get_labels() and sliver.get_labels().instance_parent: + host = sliver.get_labels().instance_parent + if sliver.get_label_allocations() and sliver.get_label_allocations().instance_parent: + host = sliver.get_label_allocations().instance_parent + if sliver.get_management_ip(): + ip_subnet = sliver.get_management_ip() + + node_id = reservation.get_graph_node_id() + if node_id: + components = [] + for c in sliver.attached_components_info.devices.values(): + if c.get_node_map(): + bqm_id, comp_id = c.get_node_map() + if c.labels and c.labels.bdf: + bdf = c.labels.bdf + if isinstance(c.labels.bdf, str): + bdf = [c.labels.bdf] + for x in bdf: + components.append((node_id, comp_id, x)) term = reservation.get_term() @@ -318,7 +345,8 @@ def add_reservation(self, *, reservation: ABCReservationMixin): oidc_claim_sub=oidc_claim_sub, email=email, site=site, rsv_type=rsv_type, components=components, lease_start=term.get_start_time() if term else None, - lease_end=term.get_end_time() if term else None) + lease_end=term.get_end_time() if term else None, + host=host, ip_subnet=ip_subnet) self.logger.debug( "Reservation {} added to slice {}".format(reservation.get_reservation_id(), reservation.get_slice())) finally: @@ -338,12 +366,19 @@ def update_reservation(self, *, reservation: ABCReservationMixin): site = None rsv_type = None components = None + ip_subnet = None + host = None + if reservation.get_resources() is not None and reservation.get_resources().get_sliver() is not None: sliver = reservation.get_resources().get_sliver() site = sliver.get_site() rsv_type = sliver.get_type().name from fim.slivers.network_service import NetworkServiceSliver + from fim.slivers.network_node import NodeSliver if isinstance(sliver, NetworkServiceSliver) and sliver.interface_info: + if sliver.get_gateway(): + ip_subnet = sliver.get_gateway().subnet + components = [] for interface in sliver.interface_info.interfaces.values(): graph_id_node_id_component_id, bqm_if_name = interface.get_node_map() @@ -357,6 +392,24 @@ def update_reservation(self, *, reservation: ABCReservationMixin): bdf = ":".join(split_string[3:]) if len(split_string) > 3 else None if node_id and comp_id and bdf: components.append((node_id, comp_id, bdf)) + elif isinstance(sliver, NodeSliver) and sliver.attached_components_info: + if sliver.get_labels() and sliver.get_labels().instance_parent: + host = sliver.get_labels().instance_parent + if sliver.get_label_allocations() and sliver.get_label_allocations().instance_parent: + host = sliver.get_label_allocations().instance_parent + ip_subnet = sliver.get_management_ip() + node_id = reservation.get_graph_node_id() + if node_id: + components = [] + for c in sliver.attached_components_info.devices.values(): + if c.get_node_map(): + bqm_id, comp_id = c.get_node_map() + if c.labels and c.labels.bdf: + bdf = c.labels.bdf + if isinstance(c.labels.bdf, str): + bdf = [c.labels.bdf] + for x in bdf: + components.append((node_id, comp_id, x)) term = reservation.get_term() begin = time.time() @@ -375,7 +428,8 @@ def update_reservation(self, *, reservation: ABCReservationMixin): rsv_graph_node_id=reservation.get_graph_node_id(), site=site, rsv_type=rsv_type, components=components, lease_start=term.get_start_time() if term else None, - lease_end=term.get_end_time() if term else None) + lease_end=term.get_end_time() if term else None, + ip_subnet=ip_subnet, host=host) diff = int(time.time() - begin) if diff > 0: self.logger.info(f"DB TIME: {diff}") @@ -512,10 +566,11 @@ def get_authority_reservations(self) -> List[ABCReservationMixin]: return result def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None, - bdf: str = None, start: datetime = None, end: datetime = None) -> Dict[str, List[str]]: + bdf: str = None, start: datetime = None, end: datetime = None, + excludes: List[str] = None) -> Dict[str, List[str]]: try: return self.db.get_components(node_id=node_id, states=states, component=component, bdf=bdf, - rsv_type=rsv_type, start=start, end=end) + rsv_type=rsv_type, start=start, end=end, excludes=excludes) except Exception as e: self.logger.error(e) finally: @@ -525,13 +580,13 @@ def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str] def get_reservations(self, *, slice_id: ID = None, graph_node_id: str = None, project_id: str = None, email: str = None, oidc_sub: str = None, rid: ID = None, states: list[int] = None, site: str = None, rsv_type: list[str] = None, start: datetime = None, - end: datetime = None) -> List[ABCReservationMixin]: + end: datetime = None, ip_subnet: str = None, host: str = None) -> List[ABCReservationMixin]: result = [] try: #self.lock.acquire() sid = str(slice_id) if slice_id is not None else None res_id = str(rid) if rid is not None else None - res_dict_list = self.db.get_reservations(slice_id=sid, graph_node_id=graph_node_id, + res_dict_list = self.db.get_reservations(slice_id=sid, graph_node_id=graph_node_id, host=host, ip_subnet=ip_subnet, project_id=project_id, email=email, oidc_sub=oidc_sub, rid=res_id, states=states, site=site, rsv_type=rsv_type, start=start, end=end) if self.lock.locked(): diff --git a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py index 353dc30e..bac382bb 100644 --- a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py +++ b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py @@ -52,7 +52,7 @@ from fabric_cf.actor.core.container.maintenance import Maintenance from fabric_cf.actor.core.delegation.resource_ticket import ResourceTicketFactory from fabric_cf.actor.core.common.exceptions import BrokerException, ExceptionErrorCode -from fabric_cf.actor.core.kernel.reservation_states import ReservationStates +from fabric_cf.actor.core.kernel.reservation_states import ReservationStates, ReservationOperation from fabric_cf.actor.core.policy.broker_calendar_policy import BrokerCalendarPolicy from fabric_cf.actor.core.policy.fifo_queue import FIFOQueue from fabric_cf.actor.core.policy.network_node_inventory import NetworkNodeInventory @@ -69,6 +69,7 @@ from fabric_cf.actor.fim.plugins.broker.aggregate_bqm_plugin import AggregatedBQMPlugin from fabric_cf.actor.core.util.resource_type import ResourceType from fabric_cf.actor.core.policy.inventory_for_type import InventoryForType +from fim.slivers.interface_info import InterfaceSliver if TYPE_CHECKING: from fabric_cf.actor.core.apis.abc_broker_mixin import ABCBrokerMixin @@ -477,7 +478,8 @@ def ticket(self, *, reservation: ABCBrokerReservation, node_id_to_reservations: self.logger.debug(f"Inventory type: {type(inv)}") term = Term(start=start, end=end) return self.ticket_inventory(reservation=reservation, inv=inv, term=term, - node_id_to_reservations=node_id_to_reservations) + node_id_to_reservations=node_id_to_reservations, + operation=ReservationOperation.Create) else: reservation.fail(message=Constants.NO_POOL) else: @@ -557,7 +559,8 @@ def __prune_nodes_in_maintenance(self, node_id_list: List[str], site: str, reser return node_id_list def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dict, inv: NetworkNodeInventory, - reservation: ABCBrokerReservation, term: Term) -> Tuple[str, BaseSliver, Any]: + reservation: ABCBrokerReservation, term: Term, sliver: NodeSliver, + operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver, Any]: """ Find First Available Node which can serve the reservation @param node_id_list: Candidate Nodes @@ -567,20 +570,17 @@ def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dic @return tuple containing delegation id, sliver, error message if any """ delegation_id = None - sliver = None error_msg = None self.logger.debug(f"Possible candidates to serve {reservation} candidates# {node_id_list}") - requested_sliver = reservation.get_requested_resources().get_sliver() - is_create = requested_sliver.get_node_map() is None for node_id in node_id_list: try: self.logger.debug(f"Attempting to allocate {reservation} via graph_node# {node_id}") graph_node = self.get_network_node_from_graph(node_id=node_id) - if requested_sliver.labels is not None and requested_sliver.labels.instance_parent is not None: - self.logger.info(f"Sliver {requested_sliver} is requested on worker: " - f"{requested_sliver.labels.instance_parent}") - if graph_node.get_name() != requested_sliver.labels.instance_parent: + if sliver.labels is not None and sliver.labels.instance_parent is not None: + self.logger.info(f"Sliver {sliver} is requested on worker: " + f"{sliver.labels.instance_parent}") + if graph_node.get_name() != sliver.labels.instance_parent: self.logger.info(f"Skipping candidate node: {graph_node}") continue @@ -589,16 +589,19 @@ def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dic start=term.get_start_time(), end=term.get_end_time()) + include_ns = False if operation == ReservationOperation.Extend else True existing_components = self.get_existing_components(node_id=node_id, start=term.get_start_time(), - end=term.get_end_time()) + end=term.get_end_time(), + excludes=[str(reservation.get_reservation_id())], + include_ns=include_ns) delegation_id, sliver = inv.allocate(rid=reservation.get_reservation_id(), - requested_sliver=requested_sliver, + requested_sliver=sliver, graph_id=self.combined_broker_model_graph_id, graph_node=graph_node, existing_reservations=existing_reservations, existing_components=existing_components, - is_create=is_create) + operation=operation) if delegation_id is not None and sliver is not None: break @@ -609,15 +612,16 @@ def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dic else: raise e - if delegation_id is None and requested_sliver.labels is not None and \ - requested_sliver.labels.instance_parent is not None: - error_msg = f"Insufficient Resources: {requested_sliver.labels.instance_parent} " \ + if delegation_id is None and sliver.labels is not None and \ + sliver.labels.instance_parent is not None: + error_msg = f"Insufficient Resources: {sliver.labels.instance_parent} " \ f"cannot serve the requested sliver - {error_msg}" return delegation_id, sliver, error_msg def __allocate_nodes(self, *, reservation: ABCBrokerReservation, inv: NetworkNodeInventory, sliver: NodeSliver, - node_id_to_reservations: dict, term: Term) -> Tuple[str or None, BaseSliver, Any]: + node_id_to_reservations: dict, term: Term, + operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str or None, BaseSliver, Any]: """ Allocate Network Node Slivers @param reservation Reservation @@ -642,255 +646,320 @@ def __allocate_nodes(self, *, reservation: ABCBrokerReservation, inv: NetworkNod # no candidate nodes found if len(node_id_list) == 0: - error_msg = f'Insufficient resources: No candidates nodes found to serve {reservation}' + error_msg = f'Insufficient resources: No hosts available to provision the {reservation}' self.logger.error(error_msg) return delegation_id, sliver, error_msg return self.__find_first_fit(node_id_list=node_id_list, node_id_to_reservations=node_id_to_reservations, - inv=inv, reservation=reservation, term=term) + inv=inv, reservation=reservation, term=term, sliver=sliver, + operation=operation) + + def __can_extend_interface_sliver(self, rid: ID, inv: NetworkServiceInventory, + ifs: InterfaceSliver, sliver: NetworkServiceSliver, + node_id_to_reservations: dict, term: Term): + """ + Checks if VLAN attached to an interface are assigned to any advanced reservations in this case + @param rid + @param inv + @param ifs + @param sliver + @param node_id_to_reservations + @param term + + @raises BrokerException in case VLAN is already assigned to any future sliver + """ + ns_node_id, ns_bqm_node_id = sliver.get_node_map() + node_id, bqm_node_id = ifs.get_node_map() + bqm_cp = self.get_interface_sliver_from_graph(node_id=bqm_node_id) + self.logger.debug(f"BQM IFS: {bqm_cp}") + owner_switch, owner_mpls, owner_ns = self.get_owners(node_id=bqm_node_id, ns_type=sliver.get_type()) + self.logger.debug(f"Owner SWITCH: {owner_switch}") + self.logger.debug(f"Owner MPLS: {owner_mpls}") + self.logger.debug(f"Owner NS: {owner_ns}") + + # Handle IPV6Ext services + ns_bqm_node_id = ns_bqm_node_id.node_id.replace('ipv6ext-ns', + 'ipv6-ns') if 'ipv6ext-ns' in ns_bqm_node_id else ns_bqm_node_id + + existing_reservations = self.get_existing_reservations( + node_id=ns_bqm_node_id, + node_id_to_reservations=node_id_to_reservations, + start=term.get_start_time(), + end=term.get_end_time(), + ) + + inv.allocate_ifs( + rid=rid, + requested_ns=sliver, + requested_ifs=ifs, + owner_ns=owner_ns, + bqm_ifs=bqm_cp, + existing_reservations=existing_reservations, + operation=ReservationOperation.Extend + ) def __allocate_services(self, *, rid: ID, inv: NetworkServiceInventory, sliver: NetworkServiceSliver, - node_id_to_reservations: dict, term: Term) -> Tuple[str, BaseSliver, Any]: + node_id_to_reservations: dict, term: Term, + operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver, Any]: """ Allocate Network Service Slivers @param rid Reservation Id @param inv Inventory @param sliver Requested sliver @param node_id_to_reservations + @param operation @return tuple containing delegation id, sliver, error message if any """ - self.logger.debug(f"Processing Network Service sliver: {sliver}") delegation_id = None error_msg = None - owner_ns = None - owner_ns_id = None - bqm_node = None - is_vnic = False - owner_mpls_ns = None - owner_switch = None - - peered_ns_interfaces = [] - ero_source_end_info = [] - - # For each Interface Sliver; - for ifs in sliver.interface_info.interfaces.values(): - node_map_id = self.combined_broker_model_graph_id - - # Fetch Network Node Id and BQM Component Id - node_id, bqm_node_id = ifs.get_node_map() - - # Skipping the already allocated interface on a modify - if self.combined_broker_model_graph_id in node_id: - continue - - if node_id == str(NodeType.Facility): - bqm_node = self.get_facility_sliver(node_name=bqm_node_id) - # Peered Interfaces are handled at the end - elif node_id == str(Constants.PEERED): - peered_ns_interfaces.append(ifs) - continue - elif node_id == str(NodeType.Switch): - bqm_node = self.get_network_node_from_graph(node_id=bqm_node_id) - node_map_id = f"{node_map_id}#{bqm_node.get_name()}#{bqm_node_id}#{ifs.get_labels().local_name}" - else: - # For VM interfaces - bqm_node = self.get_component_sliver(node_id=bqm_node_id) - node_map_id = f"{node_map_id}:{node_id}:{bqm_node_id}:{ifs.get_labels().bdf}" - - if bqm_node is None: - raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES) - - # Get BQM Connection Point in Site Delegation (c) - site_cp = FimHelper.get_site_interface_sliver(component=bqm_node, - local_name=ifs.get_labels().local_name, - region=ifs.get_labels().region, - device_name=ifs.get_labels().device_name) - self.logger.debug(f"Interface Sliver [Site Delegation] (C): {site_cp}") - - # Get BQM Peer Connection Point in Site Delegation (a) - net_cp = self.get_peer_interface_sliver(site_ifs_id=site_cp.node_id, - interface_type=InterfaceType.TrunkPort) - - if net_cp is None: - error_msg = "Peer Connection Point not found from Network AM" - raise BrokerException(msg=error_msg) - - self.logger.debug(f"Peer Interface Sliver [Network Delegation] (A): {net_cp}") - - # need to find the owner switch of the network service in CBM and take it's name or labels.local_name - owner_switch, owner_mpls_ns, owner_ns = self.get_owners(node_id=net_cp.node_id, - ns_type=sliver.get_type()) - - # Hack for IPV6Ext services - owner_ns_id = owner_ns.node_id - if 'ipv6ext-ns' in owner_ns_id: - owner_ns_id = owner_ns_id.replace('ipv6ext-ns', 'ipv6-ns') - - bqm_cp = net_cp - if bqm_node.get_type() == NodeType.Facility or \ - (sliver.get_type() == ServiceType.L2Bridge and - bqm_node.get_model() == Constants.OPENSTACK_VNIC_MODEL): - bqm_cp = site_cp - - if bqm_node.get_type() == ComponentType.SharedNIC: - if bqm_node.get_model() == Constants.OPENSTACK_VNIC_MODEL: - is_vnic = True - - # VLAN is already set by the Orchestrator using the information from the Node Sliver Parent Reservation - if ifs.get_labels().vlan is None and not is_vnic: - message = "Shared NIC VLAN cannot be None" - self.logger.error(message) - raise BrokerException(error_code=ExceptionErrorCode.FAILURE, - msg=f"{message}") - else: - existing_reservations = self.get_existing_reservations(node_id=owner_ns_id, - node_id_to_reservations=node_id_to_reservations, - start=term.get_start_time(), - end=term.get_end_time()) - # Set vlan - source: (c) - only for dedicated NICs - ifs = inv.allocate_ifs(requested_ns=sliver, requested_ifs=ifs, owner_ns=owner_ns, - bqm_ifs=bqm_cp, existing_reservations=existing_reservations) - - local_name = net_cp.get_name() - device_name = owner_switch.get_name() + try: + self.logger.debug(f"Processing Network Service sliver: {sliver}") + owner_ns = None + owner_ns_id = None + bqm_node = None + is_vnic = False + owner_mpls_ns = None + owner_switch = None - if device_name == Constants.AL2S: - delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations= - net_cp.get_label_delegations()) - device_name = delegated_label.device_name - local_name = delegated_label.local_name - - # local_name source: (a) - ifs_labels = ifs.get_labels() - ifs_labels = Labels.update(ifs_labels, local_name=local_name) - - # NSO device name source: (a) - need to find the owner switch of the network service in CBM - # and take its name or labels.local_name - # Set the NSO device-name - ifs_labels = Labels.update(ifs_labels, device_name=device_name) - adm_ids = owner_switch.get_structural_info().adm_graph_ids - site_adm_ids = bqm_node.get_structural_info().adm_graph_ids - - self.logger.debug(f"Owner Network Service: {owner_ns}") - self.logger.debug(f"Owner Switch: {owner_switch}") - if owner_switch.network_service_info is not None: - self.logger.debug(f"Owner Switch NS: {owner_switch.network_service_info.network_services.values()}") - - net_adm_ids = site_adm_ids - if bqm_node.get_type() != NodeType.Facility and not is_vnic: - net_adm_ids = [x for x in adm_ids if not x in site_adm_ids or site_adm_ids.remove(x)] - # For sites like EDC which share switch with other sites like NCSA, - # the net_adm_ids also includes delegation id from the other side, - # this results in this list having more than one entry and no way for - # the code to know which delegation is from Network AM - # Using a hack here to pick the delegation id from one of the - # layer 3 network services in the owner switch - if len(net_adm_ids) > 1: - for x in owner_switch.network_service_info.network_services.values(): - if x.get_layer() == NSLayer.L2: - continue - net_adm_ids = x.get_structural_info().adm_graph_ids - break - else: - if bqm_cp.labels is not None and bqm_cp.labels.ipv4_subnet is not None: - ifs_labels = Labels.update(ifs_labels, ipv4_subnet=bqm_cp.labels.ipv4_subnet) - if bqm_cp.labels is not None and bqm_cp.labels.ipv6_subnet is not None: - ifs_labels = Labels.update(ifs_labels, ipv6_subnet=bqm_cp.labels.ipv6_subnet) - if len(net_adm_ids) != 1: - error_msg = f"More than 1 or 0 Network Delegations found! net_adm_ids: {net_adm_ids}" - self.logger.error(error_msg) - raise BrokerException(msg=error_msg) + peered_ns_interfaces = [] + ero_source_end_info = [] - if bqm_node.get_type() == NodeType.Facility: - node_map_id = f"{node_map_id}#{bqm_node.get_name()}#{bqm_cp.node_id}#{ifs_labels.vlan}" + # For each Interface Sliver; + for ifs in sliver.interface_info.interfaces.values(): + node_map_id = self.combined_broker_model_graph_id - # Update the Interface Sliver Node Map to map to (a) - ifs.set_node_map(node_map=(node_map_id, bqm_cp.node_id)) - #ifs.set_node_map(node_map=(self.combined_broker_model_graph_id, bqm_cp.node_id)) + # Fetch Network Node Id and BQM Component Id + node_id, bqm_node_id = ifs.get_node_map() - delegation_id = net_adm_ids[0] + # Skipping the already allocated interface on a modify + if self.combined_broker_model_graph_id in node_id: - ifs.labels = ifs_labels - ifs.label_allocations = Labels.update(lab=ifs_labels) + if operation == ReservationOperation.Extend: + self.__can_extend_interface_sliver(rid=rid, inv=inv, ifs=ifs, sliver=sliver, + node_id_to_reservations=node_id_to_reservations, term=term) + continue - self.logger.info(f"Allocated Interface Sliver: {ifs} delegation: {delegation_id}") + if node_id == str(NodeType.Facility): + bqm_node = self.get_facility_sliver(node_name=bqm_node_id) + # Peered Interfaces are handled at the end + elif node_id == str(Constants.PEERED): + peered_ns_interfaces.append(ifs) + continue + elif node_id == str(NodeType.Switch): + bqm_node = self.get_network_node_from_graph(node_id=bqm_node_id) + node_map_id = f"{node_map_id}#{bqm_node.get_name()}#{bqm_node_id}#{ifs.get_labels().local_name}" + else: + # For VM interfaces + bqm_node = self.get_component_sliver(node_id=bqm_node_id) + node_map_id = f"{node_map_id}:{node_id}:{bqm_node_id}:{ifs.get_labels().bdf}" + + if bqm_node is None: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES) + + # Get BQM Connection Point in Site Delegation (c) + site_cp = FimHelper.get_site_interface_sliver(component=bqm_node, + local_name=ifs.get_labels().local_name, + region=ifs.get_labels().region, + device_name=ifs.get_labels().device_name) + self.logger.debug(f"Interface Sliver [Site Delegation] (C): {site_cp}") + + # Get BQM Peer Connection Point in Site Delegation (a) + net_cp = self.get_peer_interface_sliver(site_ifs_id=site_cp.node_id, + interface_type=InterfaceType.TrunkPort) + + if net_cp is None: + error_msg = "Peer Connection Point not found from Network AM" + raise BrokerException(msg=error_msg) + + self.logger.debug(f"Peer Interface Sliver [Network Delegation] (A): {net_cp}") + + # need to find the owner switch of the network service in CBM and take it's name or labels.local_name + owner_switch, owner_mpls_ns, owner_ns = self.get_owners(node_id=net_cp.node_id, + ns_type=sliver.get_type()) + + # Hack for IPV6Ext services + owner_ns_id = owner_ns.node_id + if 'ipv6ext-ns' in owner_ns_id: + owner_ns_id = owner_ns_id.replace('ipv6ext-ns', 'ipv6-ns') + + bqm_cp = net_cp + if bqm_node.get_type() == NodeType.Facility or \ + (sliver.get_type() == ServiceType.L2Bridge and + bqm_node.get_model() == Constants.OPENSTACK_VNIC_MODEL): + bqm_cp = site_cp + + if bqm_node.get_type() == ComponentType.SharedNIC: + if bqm_node.get_model() == Constants.OPENSTACK_VNIC_MODEL: + is_vnic = True + + # VLAN is already set by the Orchestrator using the information from the Node Sliver Parent Reservation + if ifs.get_labels().vlan is None and not is_vnic: + message = "Shared NIC VLAN cannot be None" + self.logger.error(message) + raise BrokerException(error_code=ExceptionErrorCode.FAILURE, + msg=f"{message}") + else: + existing_reservations = self.get_existing_reservations(node_id=owner_ns_id, + node_id_to_reservations=node_id_to_reservations, + start=term.get_start_time(), + end=term.get_end_time()) + # Set vlan - source: (c) - only for dedicated NICs + ifs = inv.allocate_ifs(rid=rid, requested_ns=sliver, requested_ifs=ifs, owner_ns=owner_ns, + bqm_ifs=bqm_cp, existing_reservations=existing_reservations) + + local_name = net_cp.get_name() + device_name = owner_switch.get_name() + + if device_name == Constants.AL2S: + delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations= + net_cp.get_label_delegations()) + device_name = delegated_label.device_name + local_name = delegated_label.local_name + + # local_name source: (a) + ifs_labels = ifs.get_labels() + ifs_labels = Labels.update(ifs_labels, local_name=local_name) + + # NSO device name source: (a) - need to find the owner switch of the network service in CBM + # and take its name or labels.local_name + # Set the NSO device-name + ifs_labels = Labels.update(ifs_labels, device_name=device_name) + adm_ids = owner_switch.get_structural_info().adm_graph_ids + site_adm_ids = bqm_node.get_structural_info().adm_graph_ids + + self.logger.debug(f"Owner Network Service: {owner_ns}") + self.logger.debug(f"Owner Switch: {owner_switch}") + if owner_switch.network_service_info is not None: + self.logger.debug(f"Owner Switch NS: {owner_switch.network_service_info.network_services.values()}") + + net_adm_ids = site_adm_ids + if bqm_node.get_type() != NodeType.Facility and not is_vnic: + net_adm_ids = [x for x in adm_ids if not x in site_adm_ids or site_adm_ids.remove(x)] + # For sites like EDC which share switch with other sites like NCSA, + # the net_adm_ids also includes delegation id from the other side, + # this results in this list having more than one entry and no way for + # the code to know which delegation is from Network AM + # Using a hack here to pick the delegation id from one of the + # layer 3 network services in the owner switch + if len(net_adm_ids) > 1: + for x in owner_switch.network_service_info.network_services.values(): + if x.get_layer() == NSLayer.L2: + continue + net_adm_ids = x.get_structural_info().adm_graph_ids + break + else: + if bqm_cp.labels is not None and bqm_cp.labels.ipv4_subnet is not None: + ifs_labels = Labels.update(ifs_labels, ipv4_subnet=bqm_cp.labels.ipv4_subnet) + if bqm_cp.labels is not None and bqm_cp.labels.ipv6_subnet is not None: + ifs_labels = Labels.update(ifs_labels, ipv6_subnet=bqm_cp.labels.ipv6_subnet) + if len(net_adm_ids) != 1: + error_msg = f"More than 1 or 0 Network Delegations found! net_adm_ids: {net_adm_ids}" + self.logger.error(error_msg) + raise BrokerException(msg=error_msg) + + if bqm_node.get_type() == NodeType.Facility: + node_map_id = f"{node_map_id}#{bqm_node.get_name()}#{bqm_cp.node_id}#{ifs_labels.vlan}" + + # Update the Interface Sliver Node Map to map to (a) + ifs.set_node_map(node_map=(node_map_id, bqm_cp.node_id)) + #ifs.set_node_map(node_map=(self.combined_broker_model_graph_id, bqm_cp.node_id)) + + delegation_id = net_adm_ids[0] + + ifs.labels = ifs_labels + ifs.label_allocations = Labels.update(lab=ifs_labels) + + self.logger.info(f"Allocated Interface Sliver: {ifs} delegation: {delegation_id}") + + owner_v4_service = self.get_ns_from_switch(switch=owner_switch, ns_type=ServiceType.FABNetv4) + if owner_v4_service and owner_v4_service.get_labels(): + ero_source_end_info.append((owner_switch.node_id, owner_v4_service.get_labels().ipv4)) + + if not owner_ns: + bqm_graph_id, bqm_node_id = sliver.get_node_map() + owner_ns, owner_switch = self.get_network_service_from_graph(node_id=bqm_node_id, + parent=True) + # Hack for IPV6Ext services + owner_ns_id = owner_ns.node_id + if 'ipv6ext-ns' in owner_ns_id: + owner_ns_id = owner_ns_id.replace('ipv6ext-ns', 'ipv6-ns') + + owner_mpls_ns = None + if owner_switch: + for ns in owner_switch.network_service_info.network_services.values(): + if ServiceType.MPLS == ns.get_type(): + owner_mpls_ns = ns + break + delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations= + owner_ns.get_label_delegations()) - owner_v4_service = self.get_ns_from_switch(switch=owner_switch, ns_type=ServiceType.FABNetv4) - if owner_v4_service and owner_v4_service.get_labels(): - ero_source_end_info.append((owner_switch.node_id, owner_v4_service.get_labels().ipv4)) + # Set the Subnet and gateway from the Owner Switch (a) + existing_reservations = self.get_existing_reservations(node_id=owner_ns_id, + node_id_to_reservations=node_id_to_reservations, + start=term.get_start_time(), end=term.get_end_time()) - if not owner_ns: - bqm_graph_id, bqm_node_id = sliver.get_node_map() - owner_ns, owner_switch = self.get_network_service_from_graph(node_id=bqm_node_id, - parent=True) - owner_mpls_ns = None - if owner_switch: - for ns in owner_switch.network_service_info.network_services.values(): - if ServiceType.MPLS == ns.get_type(): - owner_mpls_ns = ns - break - delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations= - owner_ns.get_label_delegations()) - - # Update the Network Service Sliver Node Map to map to parent of (a) - sliver.set_node_map(node_map=(self.combined_broker_model_graph_id, owner_ns_id)) - - # Set the Subnet and gateway from the Owner Switch (a) - existing_reservations = self.get_existing_reservations(node_id=owner_ns_id, - node_id_to_reservations=node_id_to_reservations, - start=term.get_start_time(), end=term.get_end_time()) - - # Allocate VLAN for the Network Service - if is_vnic: - site_adm_ids = bqm_node.get_structural_info().adm_graph_ids - delegation_id = site_adm_ids[0] - inv.allocate_vnic(rid=rid, requested_ns=sliver, owner_ns=owner_ns, - existing_reservations=existing_reservations) - else: - sliver = inv.allocate(rid=rid, requested_ns=sliver, owner_ns=owner_ns, + # Allocate VLAN for the Network Service + if is_vnic: + site_adm_ids = bqm_node.get_structural_info().adm_graph_ids + delegation_id = site_adm_ids[0] + inv.allocate_vnic(rid=rid, requested_ns=sliver, owner_ns=owner_ns, existing_reservations=existing_reservations) - - self.__allocate_peered_interfaces(peered_interfaces=peered_ns_interfaces, owner_switch=owner_switch, - owner_mpls=owner_mpls_ns, inv=inv, sliver=sliver, owner_ns=owner_ns, - node_id_to_reservations=node_id_to_reservations, term=term) - - if sliver.ero and len(sliver.ero.get()) and len(ero_source_end_info) == 2: - self.logger.info(f"Requested ERO: {sliver.ero}") - ero_hops = [] - new_path = [ero_source_end_info[0][1]] - type, path = sliver.ero.get() - for hop in path.get()[0]: - # User passes the site names; Broker maps the sites names to the respective switch IP - hop_switch = self.get_switch_sliver(site=hop) - self.logger.debug(f"Switch information for {hop}: {hop_switch}") - if not hop_switch: - self.logger.error(f"Requested hop: {hop} in the ERO does not exist") - raise BrokerException(error_code=ExceptionErrorCode.INVALID_ARGUMENT, - msg=f"Requested hop: {hop} in the ERO does not exist ") - - hop_v4_service = self.get_ns_from_switch(switch=hop_switch, ns_type=ServiceType.FABNetv4) - if hop_v4_service and hop_v4_service.get_labels() and hop_v4_service.get_labels().ipv4: - self.logger.debug(f"Fabnetv4 information for {hop}: {hop_v4_service}") - ero_hops.append(f"{hop_switch.node_id}-ns") - new_path.append(hop_v4_service.get_labels().ipv4) - - new_path.append(ero_source_end_info[1][1]) - - if len(new_path): - if not self.validate_requested_ero_path(source_node=ero_source_end_info[0][0], - end_node=ero_source_end_info[1][0], - hops=ero_hops): - raise BrokerException(error_code=ExceptionErrorCode.INVALID_ARGUMENT, - msg=f"Requested ERO path: {sliver.ero} is invalid!") - ero_path = Path() - ero_path.set_symmetric(new_path) - sliver.ero.set(ero_path) - self.logger.info(f"Allocated ERO: {sliver.ero}") - + else: + sliver = inv.allocate(rid=rid, requested_ns=sliver, owner_ns=owner_ns, + existing_reservations=existing_reservations) + + # Update the Network Service Sliver Node Map to map to parent of (a) + sliver.set_node_map(node_map=(self.combined_broker_model_graph_id, owner_ns_id)) + + self.__allocate_peered_interfaces(rid=rid, peered_interfaces=peered_ns_interfaces, owner_switch=owner_switch, + owner_mpls=owner_mpls_ns, inv=inv, sliver=sliver, owner_ns=owner_ns, + node_id_to_reservations=node_id_to_reservations, term=term) + + if sliver.ero and len(sliver.ero.get()) and len(ero_source_end_info) == 2: + self.logger.info(f"Requested ERO: {sliver.ero}") + ero_hops = [] + new_path = [ero_source_end_info[0][1]] + type, path = sliver.ero.get() + for hop in path.get()[0]: + # User passes the site names; Broker maps the sites names to the respective switch IP + hop_switch = self.get_switch_sliver(site=hop) + self.logger.debug(f"Switch information for {hop}: {hop_switch}") + if not hop_switch: + self.logger.error(f"Requested hop: {hop} in the ERO does not exist") + raise BrokerException(error_code=ExceptionErrorCode.INVALID_ARGUMENT, + msg=f"Requested hop: {hop} in the ERO does not exist ") + + hop_v4_service = self.get_ns_from_switch(switch=hop_switch, ns_type=ServiceType.FABNetv4) + if hop_v4_service and hop_v4_service.get_labels() and hop_v4_service.get_labels().ipv4: + self.logger.debug(f"Fabnetv4 information for {hop}: {hop_v4_service}") + ero_hops.append(f"{hop_switch.node_id}-ns") + new_path.append(hop_v4_service.get_labels().ipv4) + + new_path.append(ero_source_end_info[1][1]) + + if len(new_path): + if not self.validate_requested_ero_path(source_node=ero_source_end_info[0][0], + end_node=ero_source_end_info[1][0], + hops=ero_hops): + raise BrokerException(error_code=ExceptionErrorCode.INVALID_ARGUMENT, + msg=f"Requested ERO path: {sliver.ero} is invalid!") + ero_path = Path() + ero_path.set_symmetric(new_path) + sliver.ero.set(ero_path) + self.logger.info(f"Allocated ERO: {sliver.ero}") + + except BrokerException as e: + delegation_id = None + if e.error_code == ExceptionErrorCode.INSUFFICIENT_RESOURCES: + self.logger.error(f"Exception occurred: {e}") + error_msg = e.msg + else: + raise e + self.logger.debug(f"Allocate Services returning: {delegation_id} {sliver} {error_msg}") return delegation_id, sliver, error_msg - def __allocate_peered_interfaces(self, *, peered_interfaces: List[InterfaceSliver], owner_switch: NodeSliver, + def __allocate_peered_interfaces(self, *, rid: ID, peered_interfaces: List[InterfaceSliver], owner_switch: NodeSliver, inv: NetworkServiceInventory, sliver: NetworkServiceSliver, owner_mpls: NetworkServiceSliver, owner_ns: NetworkServiceSliver, node_id_to_reservations: dict, term: Term): @@ -944,7 +1013,7 @@ def __allocate_peered_interfaces(self, *, peered_interfaces: List[InterfaceSlive start=term.get_start_time(), end=term.get_end_time()) - pfs = inv.allocate_peered_ifs(owner_switch=owner_switch, requested_ifs=pfs, + pfs = inv.allocate_peered_ifs(rid=rid, owner_switch=owner_switch, requested_ifs=pfs, bqm_interface=bqm_interface, existing_reservations=existing_reservations) @@ -958,10 +1027,14 @@ def __allocate_peered_interfaces(self, *, peered_interfaces: List[InterfaceSlive sliver.set_node_map(node_map=(self.combined_broker_model_graph_id, owner_ns.node_id)) def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryForType, term: Term, - node_id_to_reservations: dict) -> Tuple[bool, dict, Any]: + node_id_to_reservations: dict, + operation: ReservationOperation = ReservationOperation.Create) -> Tuple[bool, dict, Any]: error_msg = None try: - rset = reservation.get_requested_resources() + if operation == ReservationOperation.Extend: + rset = reservation.get_resources() + else: + rset = reservation.get_requested_resources() needed = rset.get_units() # for network node slivers @@ -977,13 +1050,14 @@ def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryF delegation_id, sliver, error_msg = self.__allocate_nodes(reservation=reservation, inv=inv, sliver=res_sliver, node_id_to_reservations=node_id_to_reservations, - term=term) + term=term, + operation=operation) elif isinstance(res_sliver, NetworkServiceSliver): delegation_id, sliver, error_msg = self.__allocate_services(rid=reservation.get_reservation_id(), inv=inv, sliver=res_sliver, node_id_to_reservations=node_id_to_reservations, - term=term) + term=term, operation=operation) else: self.logger.error(f'Reservation {reservation} sliver type is neither Node, nor NetworkServiceSliver') raise BrokerException(msg=f"Reservation sliver type is neither Node " @@ -999,11 +1073,13 @@ def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryF if node_id_to_reservations.get(node_id, None) is None: node_id_to_reservations[node_id] = ReservationSet() node_id_to_reservations[node_id].add(reservation=reservation) + self.logger.debug(f"Ticket Inventory returning: True {error_msg}") return True, node_id_to_reservations, error_msg except Exception as e: self.logger.error(traceback.format_exc()) self.logger.error(e) reservation.fail(message=str(e)) + self.logger.debug(f"Ticket Inventory returning: False {error_msg}") return False, node_id_to_reservations, error_msg def __is_modify_on_openstack_vnic(self, *, sliver: BaseSliver) -> bool: @@ -1031,18 +1107,23 @@ def extend_private(self, *, reservation: ABCBrokerReservation, inv: InventoryFor sliver = current_resources.get_sliver() diff = sliver.diff(other_sliver=requested_resources.get_sliver()) + operation = ReservationOperation.Extend if diff is not None: sliver = requested_resources.get_sliver() + operation = ReservationOperation.Modify - if diff is None or diff.added is None or \ - (len(diff.added.components) == 0 and len(diff.added.interfaces) == 0) or \ - self.__is_modify_on_openstack_vnic(sliver=sliver): + #if diff is None or diff.added is None or \ + # (len(diff.added.components) == 0 and len(diff.added.interfaces) == 0) or \ + # self.__is_modify_on_openstack_vnic(sliver=sliver): + + if self.__is_modify_on_openstack_vnic(sliver=sliver): self.issue_ticket(reservation=reservation, units=needed, rtype=requested_resources.get_type(), term=term, source=reservation.get_source(), sliver=sliver) else: status, node_id_to_reservations, error_msg = self.ticket_inventory(reservation=reservation, inv=inv, term=term, - node_id_to_reservations=node_id_to_reservations) + node_id_to_reservations=node_id_to_reservations, + operation=operation) if not status and not reservation.is_failed(): fail_message = f"Insufficient resources for specified start time, Failing reservation: " \ f"{reservation.get_reservation_id()}" @@ -1410,6 +1491,20 @@ def get_owners(self, *, node_id: str, ns_type: ServiceType) -> Tuple[NodeSliver, finally: self.lock.release() + def get_interface_sliver_from_graph(self, *, node_id: str) -> InterfaceSliver or None: + """ + Get InterfaceSliver from CBM + :param node_id: + :return: + """ + try: + self.lock.acquire() + if self.combined_broker_model is None: + return None + return self.combined_broker_model.build_deep_interface_sliver(node_id=node_id) + finally: + self.lock.release() + def get_network_node_from_graph(self, *, node_id: str) -> NodeSliver or None: """ Get Node from CBM @@ -1485,12 +1580,17 @@ def get_existing_reservations(self, node_id: str, node_id_to_reservations: dict, return existing_reservations - def get_existing_components(self, node_id: str, start: datetime = None, end: datetime = None) -> Dict[str, List[str]]: + def get_existing_components(self, node_id: str, start: datetime = None, end: datetime = None, + excludes: List[str] = None, include_ns: bool = True, + include_node: bool = True) -> Dict[str, List[str]]: """ Get existing components attached to Active/Ticketed Network Service Slivers :param node_id: :param start: :param end: + :param excludes: + :param include_node: + :param include_ns: :return: list of components """ states = [ReservationStates.Active.value, @@ -1500,12 +1600,17 @@ def get_existing_components(self, node_id: str, start: datetime = None, end: dat ReservationStates.CloseFail.value] res_type = [] - for x in ServiceType: - res_type.append(str(x)) + if include_ns: + for x in ServiceType: + res_type.append(str(x)) + + if include_node: + for x in NodeType: + res_type.append(str(x)) # Only get Active or Ticketing reservations return self.actor.get_plugin().get_database().get_components(node_id=node_id, rsv_type=res_type, states=states, - start=start, end=end) + start=start, end=end, excludes=excludes) def set_logger(self, logger): """ diff --git a/fabric_cf/actor/core/policy/inventory.py b/fabric_cf/actor/core/policy/inventory.py index 7ed0f285..f5f9621d 100644 --- a/fabric_cf/actor/core/policy/inventory.py +++ b/fabric_cf/actor/core/policy/inventory.py @@ -24,6 +24,7 @@ # # Author: Komal Thareja (kthare10@renci.org) from __future__ import annotations + from typing import TYPE_CHECKING from fabric_cf.actor.core.common.constants import Constants diff --git a/fabric_cf/actor/core/policy/network_node_inventory.py b/fabric_cf/actor/core/policy/network_node_inventory.py index 92304e8d..83b2483c 100644 --- a/fabric_cf/actor/core/policy/network_node_inventory.py +++ b/fabric_cf/actor/core/policy/network_node_inventory.py @@ -37,6 +37,7 @@ from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin from fabric_cf.actor.core.common.constants import Constants from fabric_cf.actor.core.common.exceptions import BrokerException, ExceptionErrorCode +from fabric_cf.actor.core.kernel.reservation_states import ReservationOperation from fabric_cf.actor.core.policy.inventory_for_type import InventoryForType from fabric_cf.actor.core.util.id import ID @@ -230,13 +231,13 @@ def __update_smart_nic_labels_and_capacities(self, *, available_component: Compo def __check_component_labels_and_capacities(self, *, available_component: ComponentSliver, graph_id: str, requested_component: ComponentSliver, - is_create: bool = False) -> ComponentSliver: + operation: ReservationOperation = ReservationOperation.Create) -> ComponentSliver: """ Check if available component capacities, labels to match requested component :param available_component: available component :param graph_id: BQM graph id :param requested_component: requested component - :param is_create: is_create + :param operation: operation :return: requested component annotated with properties in case of success, None otherwise """ if requested_component.get_model() is not None and \ @@ -271,7 +272,7 @@ def __check_component_labels_and_capacities(self, *, available_component: Compon node_map = tuple([graph_id, available_component.node_id]) requested_component.set_node_map(node_map=node_map) - if requested_component.labels is None or is_create: + if requested_component.labels is None or operation == ReservationOperation.Create: requested_component.labels = Labels.update(lab=requested_component.get_label_allocations()) return requested_component @@ -342,7 +343,8 @@ def __exclude_allocated_component(self, *, graph_node: NodeSliver, available_com graph_node.attached_components_info.remove_device(name=available_component.get_name()) def __exclude_components_for_existing_reservations(self, *, rid: ID, graph_node: NodeSliver, - existing_reservations: List[ABCReservationMixin]) -> NodeSliver: + existing_reservations: List[ABCReservationMixin], + operation: ReservationOperation = ReservationOperation.Create) -> NodeSliver: """ Remove already assigned components to existing reservations from the candidate node @param rid reservation ID @@ -352,7 +354,8 @@ def __exclude_components_for_existing_reservations(self, *, rid: ID, graph_node: """ for reservation in existing_reservations: # Requested reservation should be skipped only when new i.e. not ticketed - if rid == reservation.get_reservation_id() and not reservation.is_ticketed(): + if rid == reservation.get_reservation_id() and \ + (operation == ReservationOperation.Extend or not reservation.is_ticketed()): continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available allocated_sliver = None @@ -391,7 +394,7 @@ def __exclude_components_for_existing_reservations(self, *, rid: ID, graph_node: def __check_components(self, *, rid: ID, requested_components: AttachedComponentsInfo, graph_id: str, graph_node: NodeSliver, existing_reservations: List[ABCReservationMixin], existing_components: Dict[str, List[str]], - is_create: bool = False) -> AttachedComponentsInfo: + operation: ReservationOperation = ReservationOperation.Create) -> AttachedComponentsInfo: """ Check if the requested capacities can be satisfied with the available capacities :param rid: reservation id of the reservation being served @@ -399,12 +402,15 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent :param graph_id: BQM graph id :param graph_node: BQM graph node identified to serve the reservation :param existing_reservations: Existing Reservations served by the same BQM node - :param is_create: Flag indicating if this is create or modify + :param operation: Flag indicating if this is create or modify :return: Components updated with the corresponding BQM node ids :raises: BrokerException in case the request cannot be satisfied """ + self.logger.debug(f"Available on {graph_node.node_id} components: {graph_node.attached_components_info.devices.keys()}") + self.__exclude_components_for_existing_reservations(rid=rid, graph_node=graph_node, - existing_reservations=existing_reservations) + existing_reservations=existing_reservations, + operation=operation) self.logger.debug(f"Excluding components connected to Network Services: {existing_components}") @@ -429,15 +435,42 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent comps_to_remove.append(av) for c in comps_to_remove: + self.logger.debug(f"Excluding component: {c.get_name()}") + print(f"Excluding component: {c.get_name()}") graph_node.attached_components_info.remove_device(name=c.get_name()) self.logger.debug(f"requested_components: {requested_components.devices.values()} for reservation# {rid}") for name, requested_component in requested_components.devices.items(): - if not is_create and requested_component.get_node_map() is not None: - self.logger.debug(f"==========Ignoring Allocated component: {requested_component} for modify") - # TODO exclude already allocated component to the same reservation + if operation == ReservationOperation.Modify and requested_component.get_node_map() is not None: + self.logger.debug(f"Modify: Ignoring Allocated component: {requested_component}") + continue + + if operation == ReservationOperation.Extend and requested_component.get_node_map() is not None: + bqm_id, node_id = requested_component.get_node_map() + + if requested_component.get_type() == ComponentType.SharedNIC: + allocated_bdfs = existing_components.get(node_id) + if allocated_bdfs and requested_component.labels and requested_component.labels.bdf: + bdfs = requested_component.labels.bdf + if isinstance(requested_component.labels.bdf, str): + bdfs = [requested_component.labels.bdf] + + self.logger.debug(f"Allocated BDFs: {allocated_bdfs}") + for x in bdfs: + if x in allocated_bdfs: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"Renew failed: Component of type: {requested_component.get_model()} with PCI Address: {x}" + f"already in use by another reservation for node: {graph_node.node_id}") + else: + if node_id in existing_components.keys(): + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"Renew failed: Component of type: {requested_component.get_model()} " + f"already in use by another reservation for node: {graph_node.node_id}") + + self.logger.debug(f"Renew: Component {requested_component} still available") continue - self.logger.debug(f"==========Allocating component: {requested_component}") + + self.logger.debug(f"Create: Allocating component: {requested_component}") resource_type = requested_component.get_type() resource_model = requested_component.get_model() if resource_type == ComponentType.Storage: @@ -446,7 +479,8 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent requested_component.label_allocations = Labels.update(lab=requested_component.get_labels()) continue available_components = graph_node.attached_components_info.get_devices_by_type(resource_type=resource_type) - self.logger.debug(f"available_components after excluding allocated components: {available_components}") + self.logger.debug(f"Available components of type: {resource_type} after excluding " + f"allocated components: {available_components}") if available_components is None or len(available_components) == 0: raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, @@ -458,7 +492,7 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent requested_component = self.__check_component_labels_and_capacities( available_component=component, graph_id=graph_id, requested_component=requested_component, - is_create=is_create) + operation=operation) if requested_component.get_node_map() is not None: self.logger.info(f"Assigning {component.node_id} to component# " @@ -478,7 +512,7 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent def __allocate_p4_switch(self, *, rid: ID, requested_sliver: NodeSliver, graph_id: str, graph_node: NodeSliver, existing_reservations: List[ABCReservationMixin], existing_components: Dict[str, List[str]], - is_create: bool = False) -> Tuple[str, BaseSliver]: + operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver]: """ Allocate an extending or ticketing reservation for a P4 switch @@ -488,21 +522,18 @@ def __allocate_p4_switch(self, *, rid: ID, requested_sliver: NodeSliver, graph_i :param graph_node: BQM graph node identified to serve the reservation :param existing_components: Existing Components :param existing_reservations: Existing Reservations served by the same BQM node - :param is_create: Indicates if this is create or modify + :param operation: Indicates if this is create or modify :return: Tuple of Delegation Id and the Requested Sliver annotated with BQM Node Id and other properties :raises: BrokerException in case the request cannot be satisfied """ delegation_id = None - if not is_create: + if operation == ReservationOperation.Create: # In case of modify, directly get delegation_id if len(graph_node.get_capacity_delegations().get_delegation_ids()) > 0: delegation_id = next(iter(graph_node.get_capacity_delegations().get_delegation_ids())) - # Nothing to do, just return - return delegation_id, requested_sliver - # Handle allocation to account for leaked Network Services for n in existing_components.keys(): if n in graph_node.node_id: @@ -532,7 +563,7 @@ def __allocate_p4_switch(self, *, rid: ID, requested_sliver: NodeSliver, graph_i def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, graph_node: BaseSliver, existing_reservations: List[ABCReservationMixin], existing_components: Dict[str, List[str]], - is_create: bool = False) -> Tuple[str, BaseSliver]: + operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver]: """ Allocate an extending or ticketing reservation :param rid: reservation id of the reservation to be allocated @@ -541,7 +572,7 @@ def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, grap :param graph_node: BQM graph node identified to serve the reservation :param existing_components: Existing Components :param existing_reservations: Existing Reservations served by the same BQM node - :param is_create: Indicates if this is create or modify + :param operation: Indicates if this is create or modify :return: Tuple of Delegation Id and the Requested Sliver annotated with BQM Node Id and other properties :raises: BrokerException in case the request cannot be satisfied """ @@ -564,27 +595,28 @@ def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, grap if requested_sliver.get_type() == NodeType.Switch: return self.__allocate_p4_switch(rid=rid, requested_sliver=requested_sliver, graph_id=graph_id, graph_node=graph_node, existing_reservations=existing_reservations, - existing_components=existing_components, is_create=is_create) + existing_components=existing_components, operation=operation) delegation_id = None requested_capacities = None # For create, we need to allocate the VM - if is_create: + if operation == ReservationOperation.Create: # Always use requested capacities to be mapped from flavor i.e. capacity hints requested_capacity_hints = requested_sliver.get_capacity_hints() catalog = InstanceCatalog() requested_capacities = catalog.get_instance_capacities(instance_type=requested_capacity_hints.instance_type) - - # Check if Capacities can be satisfied - delegation_id = self.__check_capacities(rid=rid, - requested_capacities=requested_capacities, - delegated_capacities=graph_node.get_capacity_delegations(), - existing_reservations=existing_reservations) else: + requested_capacities = requested_sliver.get_capacity_allocations() # In case of modify, directly get delegation_id if len(graph_node.get_capacity_delegations().get_delegation_ids()) > 0: delegation_id = next(iter(graph_node.get_capacity_delegations().get_delegation_ids())) + # Check if Capacities can be satisfied + delegation_id = self.__check_capacities(rid=rid, + requested_capacities=requested_capacities, + delegated_capacities=graph_node.get_capacity_delegations(), + existing_reservations=existing_reservations) + # Check if Components can be allocated if requested_sliver.attached_components_info is not None: requested_sliver.attached_components_info = self.__check_components( @@ -594,10 +626,10 @@ def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, grap graph_node=graph_node, existing_reservations=existing_reservations, existing_components=existing_components, - is_create=is_create) + operation=operation) # Do this only for create - if is_create: + if operation == ReservationOperation.Create: requested_sliver.capacity_allocations = Capacities() requested_sliver.capacity_allocations = Capacities.update(lab=requested_capacities) requested_sliver.label_allocations = Labels(instance_parent=graph_node.get_name()) diff --git a/fabric_cf/actor/core/policy/network_service_inventory.py b/fabric_cf/actor/core/policy/network_service_inventory.py index d071dc12..d5456b09 100644 --- a/fabric_cf/actor/core/policy/network_service_inventory.py +++ b/fabric_cf/actor/core/policy/network_service_inventory.py @@ -27,7 +27,7 @@ import random import traceback from ipaddress import IPv6Network, IPv4Network -from typing import List, Tuple +from typing import List, Tuple, Union from fim.slivers.capacities_labels import Labels from fim.slivers.gateway import Gateway @@ -38,6 +38,7 @@ from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin from fabric_cf.actor.core.common.constants import Constants from fabric_cf.actor.core.common.exceptions import BrokerException, ExceptionErrorCode +from fabric_cf.actor.core.kernel.reservation_states import ReservationOperation from fabric_cf.actor.core.policy.inventory_for_type import InventoryForType from fabric_cf.actor.core.util.id import ID @@ -63,13 +64,16 @@ def __extract_vlan_range(*, labels: Labels) -> List[int] or None: vlan_range = [int(labels.vlan)] return vlan_range - def __exclude_allocated_vlans(self, *, available_vlan_range: List[int], bqm_ifs: InterfaceSliver, + def __exclude_allocated_vlans(self, *, rid: ID, available_vlan_range: List[int], bqm_ifs: InterfaceSliver, existing_reservations: List[ABCReservationMixin]) -> List[int]: # Exclude the already allocated VLANs and subnets if existing_reservations is None: return available_vlan_range for reservation in existing_reservations: + if rid == reservation.get_reservation_id(): + continue + # For Active or Ticketed or Ticketing reservations; reduce the counts from available allocated_sliver = None if reservation.is_ticketing() and reservation.get_approved_resources() is not None: @@ -110,9 +114,10 @@ def __exclude_allocated_vlans(self, *, available_vlan_range: List[int], bqm_ifs: msg=f"No VLANs available!") return available_vlan_range - def allocate_ifs(self, *, requested_ns: NetworkServiceSliver, requested_ifs: InterfaceSliver, + def allocate_ifs(self, *, rid: ID, requested_ns: NetworkServiceSliver, requested_ifs: InterfaceSliver, owner_ns: NetworkServiceSliver, bqm_ifs: InterfaceSliver, - existing_reservations: List[ABCReservationMixin]) -> InterfaceSliver: + existing_reservations: List[ABCReservationMixin], + operation: ReservationOperation = ReservationOperation.Create) -> InterfaceSliver: """ Allocate Interface Sliver - For L2 services, validate the VLAN tag specified is within the allowed range @@ -120,20 +125,21 @@ def allocate_ifs(self, *, requested_ns: NetworkServiceSliver, requested_ifs: Int - grab the VLAN from BQM Site specific NetworkService - exclude the VLAN already assigned to other Interface Sliver on the same port - allocate the first available VLAN to the Interface Sliver + :param rid: Reservation ID :param requested_ns: Requested NetworkService :param requested_ifs: Requested Interface Sliver :param owner_ns: BQM NetworkService identified to serve the InterfaceSliver :param bqm_ifs: BQM InterfaceSliver identified to serve the InterfaceSliver :param existing_reservations: Existing Reservations which also are served by the owner switch + :param operation: Extend/Create/Modify Operation :return Interface Sliver updated with the allocated VLAN tag for FABNetv4 and FABNetv6 services :raises Exception if vlan tag range is not in the valid range for L2 services - Return the sliver updated with the VLAN """ - if requested_ns.get_layer() == NSLayer.L2: - requested_vlan = None - if requested_ifs.labels is not None and requested_ifs.labels.vlan is not None: - requested_vlan = int(requested_ifs.labels.vlan) + requested_vlan = None + if requested_ifs.labels and requested_ifs.labels.vlan: + requested_vlan = int(requested_ifs.labels.vlan) + if requested_ns.get_layer() == NSLayer.L2: # Validate the requested VLAN is in range specified on MPLS Network Service in BQM # Only do this for Non FacilityPorts if bqm_ifs.get_type() != InterfaceType.FacilityPort: @@ -141,73 +147,92 @@ def allocate_ifs(self, *, requested_ns: NetworkServiceSliver, requested_ifs: Int return requested_ifs if owner_ns.get_label_delegations() is None: - if 1 > requested_vlan > 4095: - raise BrokerException(error_code=ExceptionErrorCode.FAILURE, - msg=f"Vlan for L2 service {requested_vlan} " - f"is outside the allowed range 1-4095") - else: - return requested_ifs + if not (1 <= requested_vlan <= 4095): + raise BrokerException( + error_code=ExceptionErrorCode.FAILURE, + msg=f"Vlan for L2 service {requested_vlan} is outside the allowed range 1-4095" + ) + return requested_ifs delegation_id, delegated_label = self.get_delegations(lab_cap_delegations=owner_ns.get_label_delegations()) vlan_range = self.__extract_vlan_range(labels=delegated_label) - if vlan_range is not None and requested_vlan not in vlan_range: - raise BrokerException(error_code=ExceptionErrorCode.FAILURE, - msg=f"Vlan for L2 service {requested_vlan} is outside the available range " - f"{vlan_range}") + if vlan_range and requested_vlan not in vlan_range: + raise BrokerException( + error_code=ExceptionErrorCode.FAILURE, + msg=f"Vlan for L2 service {requested_vlan} is outside the available range {vlan_range}" + ) - # Validate the VLANs vlan_range = self.__extract_vlan_range(labels=bqm_ifs.labels) - if vlan_range is not None: - vlan_range = self.__exclude_allocated_vlans(available_vlan_range=vlan_range, bqm_ifs=bqm_ifs, + if vlan_range: + vlan_range = self.__exclude_allocated_vlans(rid=rid, available_vlan_range=vlan_range, + bqm_ifs=bqm_ifs, existing_reservations=existing_reservations) + if operation == ReservationOperation.Extend: + if requested_vlan and requested_vlan not in vlan_range: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"Renew failed: VLAN {requested_vlan} for Interface : " + f"{requested_ifs.get_name()/bqm_ifs.node_id} already in " + f"use by another reservation") + return requested_ifs + if requested_vlan is None: requested_ifs.labels.vlan = str(random.choice(vlan_range)) - #requested_ifs.labels.vlan = str(vlan_range[0]) return requested_ifs if requested_vlan not in vlan_range: - raise BrokerException(error_code=ExceptionErrorCode.FAILURE, - msg=f"Vlan for L2 service {requested_vlan} is outside the available range " - f"{vlan_range}") - + raise BrokerException( + error_code=ExceptionErrorCode.FAILURE, + msg=f"Vlan for L2 service {requested_vlan} is outside the available range {vlan_range}" + ) else: - # Grab Label Delegations - delegation_id, delegated_label = self.get_delegations( - lab_cap_delegations=owner_ns.get_label_delegations()) - - # Get the VLAN range if bqm_ifs.get_type() != InterfaceType.FacilityPort: + delegation_id, delegated_label = self.get_delegations( + lab_cap_delegations=owner_ns.get_label_delegations()) vlan_range = self.__extract_vlan_range(labels=delegated_label) else: vlan_range = self.__extract_vlan_range(labels=bqm_ifs.labels) - if vlan_range is not None: - vlan_range = self.__exclude_allocated_vlans(available_vlan_range=vlan_range, bqm_ifs=bqm_ifs, + if vlan_range: + vlan_range = self.__exclude_allocated_vlans(rid=rid, available_vlan_range=vlan_range, + bqm_ifs=bqm_ifs, existing_reservations=existing_reservations) + + if operation == ReservationOperation.Extend: + if requested_vlan and requested_vlan not in vlan_range: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"Renew failed: VLAN {requested_vlan} for Interface : " + f"{requested_ifs.get_name()}/{bqm_ifs.node_id} already in " + f"use by another reservation") + return requested_ifs + if bqm_ifs.get_type() != InterfaceType.FacilityPort: - # Allocate the first available VLAN - #requested_ifs.labels.vlan = str(random.choice(vlan_range)) - requested_ifs.labels.vlan = str(vlan_range[0]) + requested_ifs.labels.vlan = str(random.choice(vlan_range)) requested_ifs.label_allocations = Labels(vlan=requested_ifs.labels.vlan) else: - if requested_ifs.labels is None: + if not requested_ifs.labels: return requested_ifs if requested_ifs.labels.vlan is None: requested_ifs.labels.vlan = str(random.choice(vlan_range)) - requested_ifs.labels.vlan = str(vlan_range[0]) if int(requested_ifs.labels.vlan) not in vlan_range: - raise BrokerException(error_code=ExceptionErrorCode.FAILURE, - msg=f"Vlan for L3 service {requested_ifs.labels.vlan} " - f"is outside the available range " - f"{vlan_range}") + raise BrokerException( + error_code=ExceptionErrorCode.FAILURE, + msg=f"Vlan for L3 service {requested_ifs.labels.vlan} is outside the " + f"available range {vlan_range}" + ) return requested_ifs def __allocate_ip_address_to_ifs(self, *, requested_ns: NetworkServiceSliver) -> NetworkServiceSliver: + """ + Allocate IP addresses to the interfaces of the requested network service sliver. + + :param requested_ns: The requested network service sliver. + :return: The updated network service sliver with allocated IP addresses. + """ if requested_ns.gateway is None: return requested_ns @@ -301,8 +326,7 @@ def allocate(self, *, rid: ID, requested_ns: NetworkServiceSliver, owner_ns: Net :param requested_ns: Requested NetworkService :param owner_ns: BQM Network Service identified to serve the NetworkService :param existing_reservations: Existing Reservations which also are served by the owner switch - :return NetworkService updated with the allocated subnet for FABNetv4 and FABNetv6 services - Return the sliver updated with the subnet + :return: NetworkService updated with the allocated subnet for FABNetv4 and FABNetv6 services """ try: if requested_ns.get_type() not in Constants.L3_SERVICES: @@ -312,12 +336,11 @@ def allocate(self, *, rid: ID, requested_ns: NetworkServiceSliver, owner_ns: Net delegation_id, delegated_label = self.get_delegations(lab_cap_delegations=owner_ns.get_label_delegations()) # HACK to use FabNetv6 for FabNetv6Ext as both have the same range - # Needs to be removed if FabNetv6/FabNetv6Ext are configured with different ranges requested_ns_type = requested_ns.get_type() if requested_ns_type == ServiceType.FABNetv6Ext: requested_ns_type = ServiceType.FABNetv6 - # Hack End + # Handle L3VPN type specifically if requested_ns_type == ServiceType.L3VPN: if requested_ns.labels is not None: requested_ns.labels = Labels.update(requested_ns.labels, asn=delegated_label.asn) @@ -325,111 +348,195 @@ def allocate(self, *, rid: ID, requested_ns: NetworkServiceSliver, owner_ns: Net requested_ns.labels = Labels(asn=delegated_label.asn) return requested_ns - subnet_list = None + ip_network, subnet_list = self._generate_subnet_list(owner_ns=owner_ns, delegated_label=delegated_label) - # Get Subnet - if owner_ns.get_type() in Constants.L3_FABNETv6_SERVICES: - ip_network = IPv6Network(delegated_label.ipv6_subnet) - subnet_list = list(ip_network.subnets(new_prefix=64)) - # Exclude the 1st subnet as it is reserved for control plane - subnet_list.pop(0) - # https://github.com/fabric-testbed/ControlFramework/issues/376 - # Exclude the last subnet as the last subnet will be used for the FABRIC STAR Bastion Host Allocation - subnet_list.pop(-1) + # Exclude the already allocated subnets + subnet_list = self._exclude_allocated_subnets(subnet_list=subnet_list, requested_ns_type=requested_ns_type, + rid=rid, existing_reservations=existing_reservations) + + # Extend Case + if requested_ns.get_node_map(): + self._can_extend(subnet_list=subnet_list, requested_ns=requested_ns) + return requested_ns + + gateway_labels = self._assign_gateway_labels(ip_network=ip_network, subnet_list=subnet_list, + requested_ns_type=requested_ns.get_type()) + + self.logger.debug(f"Gateway Labels: {gateway_labels}") + + requested_ns.gateway = Gateway(lab=gateway_labels) - elif owner_ns.get_type() == ServiceType.FABNetv4: - ip_network = IPv4Network(delegated_label.ipv4_subnet) + # Allocate the IP Addresses for the requested NS + requested_ns = self.__allocate_ip_address_to_ifs(requested_ns=requested_ns) + except BrokerException as e: + raise e + except Exception as e: + self.logger.error(f"Error in allocate_gateway_for_ns: {e}") + self.logger.error(traceback.format_exc()) + raise BrokerException(msg=f"Allocation failure for Requested Network Service: {e}") + + return requested_ns + + def _generate_subnet_list(self, *, owner_ns: NetworkServiceSliver, + delegated_label: Labels) -> Tuple[Union[IPv4Network, IPv6Network], List]: + """ + Generate the list of subnets based on the owner network service type. + + :param owner_ns: The NetworkServiceSliver representing the owner network service. + :param delegated_label: The Labels object containing the delegated subnet information. + :return: A tuple containing the IP network and the list of generated subnets. + """ + subnet_list = None + ip_network = None + if owner_ns.get_type() in Constants.L3_FABNETv6_SERVICES: + ip_network = IPv6Network(delegated_label.ipv6_subnet) + subnet_list = list(ip_network.subnets(new_prefix=64)) + # Exclude the 1st subnet as it is reserved for control plane + subnet_list.pop(0) + # Exclude the last subnet for FABRIC STAR Bastion Host Allocation + subnet_list.pop(-1) + + elif owner_ns.get_type() in [ServiceType.FABNetv4, ServiceType.FABNetv4Ext]: + ip_network = IPv4Network(delegated_label.ipv4_subnet) + if owner_ns.get_type() == ServiceType.FABNetv4: subnet_list = list(ip_network.subnets(new_prefix=24)) - # Exclude the 1st subnet as it is reserved for control plane subnet_list.pop(0) elif owner_ns.get_type() == ServiceType.FABNetv4Ext: - ip_network = IPv4Network(delegated_label.ipv4_subnet) subnet_list = list(ip_network.hosts()) - # Exclude the already allocated subnets - for reservation in existing_reservations: - if rid == reservation.get_reservation_id(): - continue - # For Active or Ticketed or Ticketing reservations; reduce the counts from available - allocated_sliver = None - if reservation.is_ticketing() and reservation.get_approved_resources() is not None: - allocated_sliver = reservation.get_approved_resources().get_sliver() + self.logger.debug(f"Available Subnets: {subnet_list}") - if (reservation.is_active() or reservation.is_ticketed()) and \ - reservation.get_resources() is not None: - allocated_sliver = reservation.get_resources().get_sliver() + return ip_network, subnet_list - self.logger.debug(f"Existing res# {reservation.get_reservation_id()} " - f"allocated: {allocated_sliver}") + def _exclude_allocated_subnets(self, *, subnet_list: List, requested_ns_type: str, rid: ID, + existing_reservations: List[ABCReservationMixin]) -> List: + """ + Exclude the subnets that are already allocated. - if allocated_sliver is None: - continue + :param subnet_list: A list of available subnets to be allocated. + :param requested_ns_type: The type of the requested network service. + :param rid: The reservation ID of the current request. + :param existing_reservations: A list of existing reservations that may contain allocated subnets. + :return: A list of subnets excluding those that have already been allocated. + """ + for reservation in existing_reservations: + if rid == reservation.get_reservation_id(): + continue - # HACK to use FabNetv6 for FabNetv6Ext as both have the same range - # Needs to be removed if FabNetv6/FabNetv6Ext are configured with different ranges - allocated_sliver_type = allocated_sliver.get_type() - if allocated_sliver_type == ServiceType.FABNetv6Ext: - allocated_sliver_type = ServiceType.FABNetv6 - # HACK End + allocated_sliver = self._get_allocated_sliver(reservation) + if allocated_sliver is None: + continue - if allocated_sliver_type != requested_ns_type: - continue + # HACK to use FabNetv6 for FabNetv6Ext as both have the same range + # Needs to be removed if FabNetv6/FabNetv6Ext are configured with different ranges + allocated_sliver_type = allocated_sliver.get_type() + if allocated_sliver_type == ServiceType.FABNetv6Ext: + allocated_sliver_type = ServiceType.FABNetv6 + # HACK End + + if allocated_sliver_type != requested_ns_type: + continue - if allocated_sliver.get_type() == ServiceType.FABNetv4: - subnet_to_remove = IPv4Network(allocated_sliver.get_gateway().lab.ipv4_subnet) + if allocated_sliver.get_type() == ServiceType.FABNetv4: + subnet_to_remove = IPv4Network(allocated_sliver.get_gateway().subnet) + self.logger.debug( + f"Excluding already allocated IP4Subnet: " + f"{allocated_sliver.get_gateway().subnet}" + f" to res# {reservation.get_reservation_id()}") + if subnet_to_remove in subnet_list: subnet_list.remove(subnet_to_remove) - self.logger.debug( - f"Excluding already allocated IP4Subnet: " - f"{allocated_sliver.get_gateway().lab.ipv4_subnet}" - f" to res# {reservation.get_reservation_id()}") - - elif allocated_sliver.get_type() == ServiceType.FABNetv4Ext: - if allocated_sliver.labels is not None and allocated_sliver.labels.ipv4 is not None: - for x in allocated_sliver.labels.ipv4: - subnet_to_remove = ipaddress.IPv4Address(x) + + elif allocated_sliver.get_type() == ServiceType.FABNetv4Ext: + if allocated_sliver.labels is not None and allocated_sliver.labels.ipv4 is not None: + for x in allocated_sliver.labels.ipv4: + subnet_to_remove = ipaddress.IPv4Address(x) + self.logger.debug( + f"Excluding already allocated IP4: " + f"{x}" + f" to res# {reservation.get_reservation_id()}") + if subnet_to_remove in subnet_list: subnet_list.remove(subnet_to_remove) - self.logger.debug( - f"Excluding already allocated IPv4: {x}" - f" to res# {reservation.get_reservation_id()}") - elif allocated_sliver.get_type() in Constants.L3_FABNETv6_SERVICES: - subnet_to_remove = IPv6Network(allocated_sliver.get_gateway().lab.ipv6_subnet) + elif allocated_sliver.get_type() in Constants.L3_FABNETv6_SERVICES: + subnet_to_remove = IPv6Network(allocated_sliver.get_gateway().subnet) + self.logger.debug( + f"Excluding already allocated IP6Subnet: " + f"{allocated_sliver.get_gateway().subnet}" + f" to res# {reservation.get_reservation_id()}") + + if subnet_to_remove in subnet_list: subnet_list.remove(subnet_to_remove) - self.logger.debug( - f"Excluding already allocated IPv6Subnet: " - f"{allocated_sliver.get_gateway().lab.ipv6_subnet}" - f" to res# {reservation.get_reservation_id()}") - gateway_labels = Labels() - if requested_ns.get_type() == ServiceType.FABNetv4: - gateway_labels.ipv4_subnet = subnet_list[0].with_prefixlen - gateway_labels.ipv4 = str(list(subnet_list[0].hosts())[0]) + self.logger.debug(f"Excluding already allocated subnet for reservation {reservation.get_reservation_id()}") - elif requested_ns.get_type() == ServiceType.FABNetv4Ext: - gateway_labels.ipv4_subnet = ip_network.with_prefixlen - gateway_labels.ipv4 = str(subnet_list[0]) + return subnet_list - elif requested_ns.get_type() in Constants.L3_FABNETv6_SERVICES: - gateway_labels.ipv6_subnet = subnet_list[0].with_prefixlen - gateway_labels.ipv6 = str(next(subnet_list[0].hosts())) + def _get_allocated_sliver(self, reservation: ABCReservationMixin) -> NetworkServiceSliver: + """ + Retrieve the allocated sliver from the reservation. - self.logger.debug(f"Gateway Labels: {gateway_labels}") + :param reservation: An instance of ABCReservationMixin representing the reservation to retrieve the sliver from. + :return: The allocated NetworkServiceSliver if available, otherwise None. + """ + if reservation.is_ticketing() and reservation.get_approved_resources() is not None: + return reservation.get_approved_resources().get_sliver() + if (reservation.is_active() or reservation.is_ticketed()) and reservation.get_resources() is not None: + return reservation.get_resources().get_sliver() - requested_ns.gateway = Gateway(lab=gateway_labels) + self.logger.error("Could not find the allocated Sliver - should not reach here!") - # Allocate the IP Addresses for the requested NS - requested_ns = self.__allocate_ip_address_to_ifs(requested_ns=requested_ns) - except Exception as e: - self.logger.error(f"Error in allocate_gateway_for_ns: {e}") - self.logger.error(traceback.format_exc()) - raise BrokerException(msg=f"Allocation failure for Requested Network Service: {e}") - return requested_ns + def _assign_gateway_labels(self, *, ip_network: Union[IPv4Network, IPv6Network], subnet_list: List, + requested_ns_type: str) -> Labels: + """ + Assign gateway labels based on the requested network service type. + + :param ip_network: The IP network from which subnets are derived, either IPv4Network or IPv6Network. + :param subnet_list: A list of subnets derived from the ip_network. + :param requested_ns_type: The type of the requested network service. + :return: Gateway labels populated with the appropriate subnet and IP address. + """ + gateway_labels = Labels() + if requested_ns_type == ServiceType.FABNetv4: + gateway_labels.ipv4_subnet = subnet_list[0].with_prefixlen + gateway_labels.ipv4 = str(list(subnet_list[0].hosts())[0]) + + elif requested_ns_type == ServiceType.FABNetv4Ext: + gateway_labels.ipv4_subnet = ip_network.with_prefixlen + gateway_labels.ipv4 = str(subnet_list[0]) + + elif requested_ns_type in Constants.L3_FABNETv6_SERVICES: + gateway_labels.ipv6_subnet = subnet_list[0].with_prefixlen + gateway_labels.ipv6 = str(next(subnet_list[0].hosts())) + + self.logger.debug(f"Allocated Gateway Labels for Network Service: {gateway_labels}") + + return gateway_labels + + def _can_extend(self, *, subnet_list: List, requested_ns: NetworkServiceSliver): + if requested_ns.get_type() == ServiceType.FABNetv4: + allocated_subnet = ipaddress.IPv4Network(requested_ns.gateway.subnet) + if allocated_subnet not in subnet_list: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"Renew failed: Subnet {requested_ns.gateway.subnet} for Network Service : " + f"{requested_ns.get_type()} already in use by another reservation") + elif requested_ns.get_type() == ServiceType.FABNetv4Ext: + if requested_ns.gateway.gateway in subnet_list: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"Renew failed: Subnet {requested_ns.gateway.subnet} for Network Service : " + f"{requested_ns.get_type()} already in use by another reservation") + + elif requested_ns.get_type() in Constants.L3_FABNETv6_SERVICES: + allocated_subnet = ipaddress.IPv6Network(requested_ns.gateway.subnet) + if allocated_subnet not in subnet_list: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"Renew failed: Subnet {requested_ns.gateway.subnet} for Network Service : " + f"{requested_ns.get_type()} already in use by another reservation") def free(self, *, count: int, request: dict = None, resource: dict = None) -> dict: pass - def allocate_peered_ifs(self, *, owner_switch: NodeSliver, + def allocate_peered_ifs(self, *, rid: ID, owner_switch: NodeSliver, requested_ifs: InterfaceSliver, bqm_interface: InterfaceSliver, existing_reservations: List[ABCReservationMixin]) -> InterfaceSliver: """ @@ -452,9 +559,16 @@ def allocate_peered_ifs(self, *, owner_switch: NodeSliver, if bqm_interface.labels.vlan_range is not None: vlan_range = self.__extract_vlan_range(labels=bqm_interface.labels) - available_vlans = self.__exclude_allocated_vlans(available_vlan_range=vlan_range, bqm_ifs=bqm_interface, + available_vlans = self.__exclude_allocated_vlans(rid=rid, available_vlan_range=vlan_range, bqm_ifs=bqm_interface, existing_reservations=existing_reservations) + # Extend case + if requested_ifs.get_node_map() and requested_ifs.labels and requested_ifs.labels.vlan: + if int(requested_ifs.labels.vlan) in available_vlans: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"Renew failed: VLAN {requested_ifs.labels.vlan} for Interface : " + f"{requested_ifs.get_name()} already in use by another reservation") + vlan = str(random.choice(available_vlans)) #vlan = str(available_vlans[0]) ifs_labels = Labels.update(ifs_labels, vlan=vlan) diff --git a/fabric_cf/actor/db/__init__.py b/fabric_cf/actor/db/__init__.py index 8254565e..11f85f9b 100644 --- a/fabric_cf/actor/db/__init__.py +++ b/fabric_cf/actor/db/__init__.py @@ -125,6 +125,8 @@ class Reservations(Base): rsv_slc_id = Column(Integer, ForeignKey(FOREIGN_KEY_SLICE_ID), index=True) rsv_resid = Column(String, nullable=False, index=True) oidc_claim_sub = Column(String, nullable=True, index=True) + host = Column(String, nullable=True, index=True) + ip_subnet = Column(String, nullable=True, index=True) email = Column(String, nullable=True, index=True) project_id = Column(String, nullable=True, index=True) site = Column(String, nullable=True, index=True) @@ -143,6 +145,8 @@ class Reservations(Base): Index('idx_resid_state', rsv_resid, rsv_state) Index('idx_slcid_state', rsv_slc_id, rsv_state) Index('idx_graph_id_res_id', rsv_graph_node_id, rsv_resid) + Index('idx_host', host) + Index('idx_ip_subnet', ip_subnet) class Slices(Base): diff --git a/fabric_cf/actor/db/psql_database.py b/fabric_cf/actor/db/psql_database.py index 2300dc03..fc4cfb0b 100644 --- a/fabric_cf/actor/db/psql_database.py +++ b/fabric_cf/actor/db/psql_database.py @@ -656,7 +656,7 @@ def add_reservation(self, *, slc_guid: str, rsv_resid: str, rsv_category: int, r rsv_pending: int, rsv_joining: int, properties, lease_start: datetime = None, lease_end: datetime = None, rsv_graph_node_id: str = None, oidc_claim_sub: str = None, email: str = None, project_id: str = None, site: str = None, rsv_type: str = None, - components: List[Tuple[str, str, str]] = None): + components: List[Tuple[str, str, str]] = None, host: str = None, ip_subnet: str = None): """ Add a reservation @param slc_guid slice guid @@ -675,6 +675,8 @@ def add_reservation(self, *, slc_guid: str, rsv_resid: str, rsv_category: int, r @param site site @param rsv_type reservation type @param components list of components + @param host host + @param ip_subnet ip_subnet """ session = self.get_session() try: @@ -683,7 +685,7 @@ def add_reservation(self, *, slc_guid: str, rsv_resid: str, rsv_category: int, r rsv_state=rsv_state, rsv_pending=rsv_pending, rsv_joining=rsv_joining, lease_start=lease_start, lease_end=lease_end, properties=properties, oidc_claim_sub=oidc_claim_sub, email=email, - project_id=project_id, site=site, rsv_type=rsv_type) + project_id=project_id, site=site, rsv_type=rsv_type, host=host, ip_subnet=ip_subnet) if rsv_graph_node_id is not None: rsv_obj.rsv_graph_node_id = rsv_graph_node_id @@ -704,7 +706,8 @@ def add_reservation(self, *, slc_guid: str, rsv_resid: str, rsv_category: int, r def update_reservation(self, *, slc_guid: str, rsv_resid: str, rsv_category: int, rsv_state: int, rsv_pending: int, rsv_joining: int, properties, lease_start: datetime = None, lease_end: datetime = None, rsv_graph_node_id: str = None, site: str = None, - rsv_type: str = None, components: List[Tuple[str, str, str]] = None): + rsv_type: str = None, components: List[Tuple[str, str, str]] = None, + host: str = None, ip_subnet: str = None): """ Update a reservation @param slc_guid slice guid @@ -720,6 +723,8 @@ def update_reservation(self, *, slc_guid: str, rsv_resid: str, rsv_category: int @param site site @param rsv_type reservation type @param components list of components + @param ip_subnet ip subnet + @param host host """ session = self.get_session() try: @@ -732,6 +737,10 @@ def update_reservation(self, *, slc_guid: str, rsv_resid: str, rsv_category: int rsv_obj.properties = properties rsv_obj.lease_end = lease_end rsv_obj.lease_start = lease_start + if host: + rsv_obj.host = host + if ip_subnet: + rsv_obj.ip_subnet = ip_subnet if site is not None: rsv_obj.site = site if rsv_graph_node_id is not None: @@ -797,7 +806,8 @@ def remove_reservation(self, *, rsv_resid: str): raise e def create_reservation_filter(self, *, slice_id: str = None, graph_node_id: str = None, project_id: str = None, - email: str = None, oidc_sub: str = None, rid: str = None, site: str = None) -> dict: + email: str = None, oidc_sub: str = None, rid: str = None, site: str = None, + ip_subnet: str = None, host: str = None) -> dict: filter_dict = {} if slice_id is not None: @@ -815,12 +825,18 @@ def create_reservation_filter(self, *, slice_id: str = None, graph_node_id: str filter_dict['rsv_resid'] = rid if site is not None: filter_dict['site'] = site + if ip_subnet: + filter_dict['ip_subnet'] = ip_subnet + if host: + filter_dict['host'] = host + return filter_dict def get_reservations(self, *, slice_id: str = None, graph_node_id: str = None, project_id: str = None, email: str = None, oidc_sub: str = None, rid: str = None, states: list[int] = None, category: list[int] = None, site: str = None, rsv_type: list[str] = None, - start: datetime = None, end: datetime = None) -> List[dict]: + start: datetime = None, end: datetime = None, ip_subnet: str = None, + host: str = None) -> List[dict]: """ Get Reservations for an actor @param slice_id slice id @@ -835,6 +851,8 @@ def get_reservations(self, *, slice_id: str = None, graph_node_id: str = None, p @param rsv_type rsv_type @param start search for slivers with lease_end_time after start @param end search for slivers with lease_end_time before end + @param ip_subnet ip subnet + @param host host @return list of reservations """ @@ -843,7 +861,7 @@ def get_reservations(self, *, slice_id: str = None, graph_node_id: str = None, p try: filter_dict = self.create_reservation_filter(slice_id=slice_id, graph_node_id=graph_node_id, project_id=project_id, email=email, oidc_sub=oidc_sub, - rid=rid, site=site) + rid=rid, site=site, ip_subnet=ip_subnet, host=host) rows = session.query(Reservations).filter_by(**filter_dict) if rsv_type is not None: @@ -885,7 +903,8 @@ def get_reservations(self, *, slice_id: str = None, graph_node_id: str = None, p return result def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None, - bdf: str = None, start: datetime = None, end: datetime = None) -> Dict[str, List[str]]: + bdf: str = None, start: datetime = None, end: datetime = None, + excludes: List[str] = None) -> Dict[str, List[str]]: """ Returns components matching the search criteria @param node_id: Worker Node ID to which components belong @@ -895,6 +914,7 @@ def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str] @param bdf: Component's PCI address @param start: start time @param end: end time + @param excludes: list of the reservations ids to exclude NOTE# For P4 switches; node_id=node+renc-p4-sw component=ip+192.168.11.8 bdf=p1 @@ -928,6 +948,10 @@ def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str] .options(joinedload(Components.reservation)) ) + # Add excludes filter if excludes list is not None and not empty + if excludes: + rows = rows.filter(Reservations.rsv_resid.notin_(excludes)) + # Query Component records for reservations in the specified state and owner with the target string if component is not None and bdf is not None: rows = rows.filter(Components.component == component, Components.bdf == bdf) @@ -1876,3 +1900,8 @@ def test3(): test2() #test() #test3() + + logger = logging.getLogger('PsqlDatabase') + db = PsqlDatabase(user='fabric', password='fabric', database='orchestrator', db_host='127.0.0.1:5432', + logger=logger) + comps = db.get_components(node_id="HX7LQ53") \ No newline at end of file diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 5997803c..9ea5979a 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -514,6 +514,7 @@ def modify_slice(self, *, token: str, slice_id: str, slice_graph: str) -> List[d config_props[Constants.TAGS] = ','.join(tags) config_props[Constants.TOKEN_HASH] = fabric_token.token_hash slice_obj.set_config_properties(value=config_props) + slice_obj.state = SliceState.Modifying.value if not controller.update_slice(slice_obj=slice_obj, modify_state=modify_state): self.logger.error(f"Failed to update slice: {slice_id} error: {controller.get_last_error()}") @@ -711,6 +712,7 @@ def renew_slice(self, *, token: str, slice_id: str, new_lease_end_time: datetime :return: """ failed_to_extend_rid_list = [] + extend_rid_list = [] try: controller = self.controller_state.get_management_actor() self.logger.debug(f"renew_slice invoked for Controller: {controller}") @@ -765,6 +767,9 @@ def renew_slice(self, *, token: str, slice_id: str, new_lease_end_time: datetime if new_end_time < current_end_time: raise OrchestratorException(f"Attempted new term end time is shorter than current slice end time") + if new_end_time == current_end_time: + continue + self.logger.debug(f"Extending reservation with reservation# {r.get_reservation_id()}") result = controller.extend_reservation(reservation=ID(uid=r.get_reservation_id()), new_end_time=new_end_time, @@ -772,16 +777,25 @@ def renew_slice(self, *, token: str, slice_id: str, new_lease_end_time: datetime if not result: self.logger.error(f"Error: {controller.get_last_error()}") failed_to_extend_rid_list.append(r.get_reservation_id()) + else: + extend_rid_list.append(r.get_reservation_id()) + ''' if len(failed_to_extend_rid_list) == 0: slice_object.set_lease_end(lease_end=new_end_time) if not controller.update_slice(slice_obj=slice_object): self.logger.error(f"Failed to update lease end time: {new_end_time} in Slice: {slice_object}") self.logger.error(controller.get_last_error()) + ''' if len(failed_to_extend_rid_list) > 0: raise OrchestratorException(f"Failed to extend reservation# {failed_to_extend_rid_list}") + if len(extend_rid_list): + slice_object.state = SliceState.Configuring.value + if not controller.update_slice(slice_obj=slice_object, modify_state=True): + self.logger.error(f"Failed to update slice: {slice_id} error: {controller.get_last_error()}") + EventLoggerSingleton.get().log_slice_event(slice_object=slice_object, action=ActionId.renew) except Exception as e: self.logger.error(traceback.format_exc()) diff --git a/fabric_cf/orchestrator/test/test.yaml b/fabric_cf/orchestrator/test/test.yaml index c875f443..f8641d73 100644 --- a/fabric_cf/orchestrator/test/test.yaml +++ b/fabric_cf/orchestrator/test/test.yaml @@ -48,6 +48,7 @@ runtime: commit.batch.size: 1 enable.auto.commit: False consumer.poll.timeout: 250 + infrastructure.project.id: 4604cab7-41ff-4c1a-a935-0ca6f20cceeb logging: ## The directory in which actor should create log files. diff --git a/psql.upgrade b/psql.upgrade index a6a6e9b9..05603177 100644 --- a/psql.upgrade +++ b/psql.upgrade @@ -78,4 +78,10 @@ CREATE TABLE IF NOT EXISTS "Metrics" ( user_id VARCHAR NOT NULL, project_id VARCHAR NOT NULL, slice_count INTEGER NOT NULL, -); \ No newline at end of file +); + +ALTER TABLE "Reservations" ADD COLUMN host VARCHAR(255) NULL; +ALTER TABLE "Reservations" ADD COLUMN ip_subnet VARCHAR(255) NULL; + +CREATE INDEX idx_host ON "Reservations"(host); +CREATE INDEX idx_ip_subnet ON "Reservations"(ip_subnet); \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 056d31b3..df1f98eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,10 +26,10 @@ dependencies = [ "connexion==2.14.2", "swagger-ui-bundle==0.0.9", "PyYAML", - "fabric_fss_utils==1.5.0", - "fabric-message-bus==1.7.0b1", - "fabric-fim==1.7.0b13", - "fabric-credmgr-client==1.6.0", + "fabric_fss_utils==1.5.1", + "fabric-message-bus==1.7.0rc0", + "fabric-fim==1.7.0", + "fabric-credmgr-client==1.6.1", "ansible" ]