Skip to content

Commit

Permalink
separate handling for extend and modify
Browse files Browse the repository at this point in the history
  • Loading branch information
kthare10 committed Jul 10, 2024
1 parent 7090a6e commit 7e444aa
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 28 deletions.
13 changes: 13 additions & 0 deletions fabric_cf/actor/core/kernel/reservation_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,22 @@
#
#
# Author: Komal Thareja ([email protected])
import enum
from enum import Enum


class ReservationOperation(enum.Enum):
Create = enum.auto(),
Modify = enum.auto(),
Extend = enum.auto()

def __repr__(self):
return self.name

def __str__(self):
return self.name


class ReservationStates(Enum):
"""
Reservation states
Expand Down
30 changes: 19 additions & 11 deletions fabric_cf/actor/core/policy/broker_simpler_units_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from fabric_cf.actor.core.container.maintenance import Maintenance
from fabric_cf.actor.core.delegation.resource_ticket import ResourceTicketFactory
from fabric_cf.actor.core.common.exceptions import BrokerException, ExceptionErrorCode
from fabric_cf.actor.core.kernel.reservation_states import ReservationStates
from fabric_cf.actor.core.kernel.reservation_states import ReservationStates, ReservationOperation
from fabric_cf.actor.core.policy.broker_calendar_policy import BrokerCalendarPolicy
from fabric_cf.actor.core.policy.fifo_queue import FIFOQueue
from fabric_cf.actor.core.policy.network_node_inventory import NetworkNodeInventory
Expand Down Expand Up @@ -477,7 +477,8 @@ def ticket(self, *, reservation: ABCBrokerReservation, node_id_to_reservations:
self.logger.debug(f"Inventory type: {type(inv)}")
term = Term(start=start, end=end)
return self.ticket_inventory(reservation=reservation, inv=inv, term=term,
node_id_to_reservations=node_id_to_reservations)
node_id_to_reservations=node_id_to_reservations,
operation=ReservationOperation.Create)
else:
reservation.fail(message=Constants.NO_POOL)
else:
Expand Down Expand Up @@ -557,7 +558,8 @@ def __prune_nodes_in_maintenance(self, node_id_list: List[str], site: str, reser
return node_id_list

def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dict, inv: NetworkNodeInventory,
reservation: ABCBrokerReservation, term: Term, sliver: NodeSliver) -> Tuple[str, BaseSliver, Any]:
reservation: ABCBrokerReservation, term: Term, sliver: NodeSliver,
operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver, Any]:
"""
Find First Available Node which can serve the reservation
@param node_id_list: Candidate Nodes
Expand All @@ -569,7 +571,6 @@ def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dic
delegation_id = None
error_msg = None
self.logger.debug(f"Possible candidates to serve {reservation} candidates# {node_id_list}")
is_create = sliver.get_node_map() is None
for node_id in node_id_list:
try:
self.logger.debug(f"Attempting to allocate {reservation} via graph_node# {node_id}")
Expand Down Expand Up @@ -597,7 +598,7 @@ def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dic
graph_node=graph_node,
existing_reservations=existing_reservations,
existing_components=existing_components,
is_create=is_create)
operation=operation)

if delegation_id is not None and sliver is not None:
break
Expand All @@ -616,7 +617,8 @@ def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dic
return delegation_id, sliver, error_msg

def __allocate_nodes(self, *, reservation: ABCBrokerReservation, inv: NetworkNodeInventory, sliver: NodeSliver,
node_id_to_reservations: dict, term: Term) -> Tuple[str or None, BaseSliver, Any]:
node_id_to_reservations: dict, term: Term,
operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str or None, BaseSliver, Any]:
"""
Allocate Network Node Slivers
@param reservation Reservation
Expand Down Expand Up @@ -647,7 +649,8 @@ def __allocate_nodes(self, *, reservation: ABCBrokerReservation, inv: NetworkNod

return self.__find_first_fit(node_id_list=node_id_list,
node_id_to_reservations=node_id_to_reservations,
inv=inv, reservation=reservation, term=term, sliver=sliver)
inv=inv, reservation=reservation, term=term, sliver=sliver,
operation=operation)

def __allocate_services(self, *, rid: ID, inv: NetworkServiceInventory, sliver: NetworkServiceSliver,
node_id_to_reservations: dict, term: Term) -> Tuple[str, BaseSliver, Any]:
Expand Down Expand Up @@ -962,10 +965,11 @@ def __allocate_peered_interfaces(self, *, rid: ID, peered_interfaces: List[Inter
sliver.set_node_map(node_map=(self.combined_broker_model_graph_id, owner_ns.node_id))

def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryForType, term: Term,
node_id_to_reservations: dict, extend: bool = False) -> Tuple[bool, dict, Any]:
node_id_to_reservations: dict,
operation: ReservationOperation = ReservationOperation.Create) -> Tuple[bool, dict, Any]:
error_msg = None
try:
if extend:
if operation == ReservationOperation.Extend:
rset = reservation.get_resources()
else:
rset = reservation.get_requested_resources()
Expand All @@ -984,7 +988,8 @@ def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryF
delegation_id, sliver, error_msg = self.__allocate_nodes(reservation=reservation, inv=inv,
sliver=res_sliver,
node_id_to_reservations=node_id_to_reservations,
term=term)
term=term,
operation=operation)

elif isinstance(res_sliver, NetworkServiceSliver):
delegation_id, sliver, error_msg = self.__allocate_services(rid=reservation.get_reservation_id(),
Expand Down Expand Up @@ -1038,20 +1043,23 @@ def extend_private(self, *, reservation: ABCBrokerReservation, inv: InventoryFor
sliver = current_resources.get_sliver()
diff = sliver.diff(other_sliver=requested_resources.get_sliver())

operation = ReservationOperation.Extend
if diff is not None:
sliver = requested_resources.get_sliver()
operation = ReservationOperation.Modify

#if diff is None or diff.added is None or \
# (len(diff.added.components) == 0 and len(diff.added.interfaces) == 0) or \
# self.__is_modify_on_openstack_vnic(sliver=sliver):

if self.__is_modify_on_openstack_vnic(sliver=sliver):
self.issue_ticket(reservation=reservation, units=needed, rtype=requested_resources.get_type(),
term=term, source=reservation.get_source(), sliver=sliver)
else:
status, node_id_to_reservations, error_msg = self.ticket_inventory(reservation=reservation,
inv=inv, term=term,
node_id_to_reservations=node_id_to_reservations,
extend=True)
operation=operation)
if not status and not reservation.is_failed():
fail_message = f"Insufficient resources for specified start time, Failing reservation: " \
f"{reservation.get_reservation_id()}"
Expand Down
1 change: 1 addition & 0 deletions fabric_cf/actor/core/policy/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#
# Author: Komal Thareja ([email protected])
from __future__ import annotations

from typing import TYPE_CHECKING

from fabric_cf.actor.core.common.constants import Constants
Expand Down
42 changes: 25 additions & 17 deletions fabric_cf/actor/core/policy/network_node_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin
from fabric_cf.actor.core.common.constants import Constants
from fabric_cf.actor.core.common.exceptions import BrokerException, ExceptionErrorCode
from fabric_cf.actor.core.kernel.reservation_states import ReservationOperation
from fabric_cf.actor.core.policy.inventory_for_type import InventoryForType
from fabric_cf.actor.core.util.id import ID

Expand Down Expand Up @@ -230,13 +231,13 @@ def __update_smart_nic_labels_and_capacities(self, *, available_component: Compo

def __check_component_labels_and_capacities(self, *, available_component: ComponentSliver, graph_id: str,
requested_component: ComponentSliver,
is_create: bool = False) -> ComponentSliver:
operation: ReservationOperation = ReservationOperation.Create) -> ComponentSliver:
"""
Check if available component capacities, labels to match requested component
:param available_component: available component
:param graph_id: BQM graph id
:param requested_component: requested component
:param is_create: is_create
:param operation: operation
:return: requested component annotated with properties in case of success, None otherwise
"""
if requested_component.get_model() is not None and \
Expand Down Expand Up @@ -271,7 +272,7 @@ def __check_component_labels_and_capacities(self, *, available_component: Compon

node_map = tuple([graph_id, available_component.node_id])
requested_component.set_node_map(node_map=node_map)
if requested_component.labels is None or is_create:
if requested_component.labels is None or operation == ReservationOperation.Create:
requested_component.labels = Labels.update(lab=requested_component.get_label_allocations())

return requested_component
Expand Down Expand Up @@ -391,18 +392,20 @@ def __exclude_components_for_existing_reservations(self, *, rid: ID, graph_node:
def __check_components(self, *, rid: ID, requested_components: AttachedComponentsInfo, graph_id: str,
graph_node: NodeSliver, existing_reservations: List[ABCReservationMixin],
existing_components: Dict[str, List[str]],
is_create: bool = False) -> AttachedComponentsInfo:
operation: ReservationOperation = ReservationOperation.Create) -> AttachedComponentsInfo:
"""
Check if the requested capacities can be satisfied with the available capacities
:param rid: reservation id of the reservation being served
:param requested_components: Requested components
:param graph_id: BQM graph id
:param graph_node: BQM graph node identified to serve the reservation
:param existing_reservations: Existing Reservations served by the same BQM node
:param is_create: Flag indicating if this is create or modify
:param operation: Flag indicating if this is create or modify
:return: Components updated with the corresponding BQM node ids
:raises: BrokerException in case the request cannot be satisfied
"""
self.logger.debug(f"available_components after excluding allocated components: {graph_node.attached_components_info.devices.keys()}")

self.__exclude_components_for_existing_reservations(rid=rid, graph_node=graph_node,
existing_reservations=existing_reservations)

Expand Down Expand Up @@ -433,7 +436,11 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent

self.logger.debug(f"requested_components: {requested_components.devices.values()} for reservation# {rid}")
for name, requested_component in requested_components.devices.items():
if not is_create and requested_component.get_node_map() is not None:
if operation == ReservationOperation.Modify and requested_component.get_node_map() is not None:
self.logger.debug(f"==========Ignoring Allocated component: {requested_component} for modify")
continue

if operation == ReservationOperation.Extend and requested_component.get_node_map() is not None:
bqm_id, node_id = requested_component.get_node_map()
if requested_component.get_type() == ComponentType.SharedNIC:
allocated_bdfs = existing_components.get(node_id)
Expand All @@ -452,8 +459,9 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent
msg=f"Renew failed: Component of type: {requested_component.get_model()} "
f"not available in graph node: {graph_node.node_id}")

self.logger.debug(f"==========Ignoring Allocated component: {requested_component} for modify")
self.logger.debug(f"==========Ignoring Allocated component: {requested_component} for renew")
continue

self.logger.debug(f"==========Allocating component: {requested_component}")
resource_type = requested_component.get_type()
resource_model = requested_component.get_model()
Expand All @@ -475,7 +483,7 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent
requested_component = self.__check_component_labels_and_capacities(
available_component=component, graph_id=graph_id,
requested_component=requested_component,
is_create=is_create)
operation=operation)

if requested_component.get_node_map() is not None:
self.logger.info(f"Assigning {component.node_id} to component# "
Expand All @@ -495,7 +503,7 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent

def __allocate_p4_switch(self, *, rid: ID, requested_sliver: NodeSliver, graph_id: str, graph_node: NodeSliver,
existing_reservations: List[ABCReservationMixin], existing_components: Dict[str, List[str]],
is_create: bool = False) -> Tuple[str, BaseSliver]:
operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver]:
"""
Allocate an extending or ticketing reservation for a P4 switch
Expand All @@ -505,14 +513,14 @@ def __allocate_p4_switch(self, *, rid: ID, requested_sliver: NodeSliver, graph_i
:param graph_node: BQM graph node identified to serve the reservation
:param existing_components: Existing Components
:param existing_reservations: Existing Reservations served by the same BQM node
:param is_create: Indicates if this is create or modify
:param operation: Indicates if this is create or modify
:return: Tuple of Delegation Id and the Requested Sliver annotated with BQM Node Id and other properties
:raises: BrokerException in case the request cannot be satisfied
"""
delegation_id = None

if not is_create:
if operation == ReservationOperation.Create:
# In case of modify, directly get delegation_id
if len(graph_node.get_capacity_delegations().get_delegation_ids()) > 0:
delegation_id = next(iter(graph_node.get_capacity_delegations().get_delegation_ids()))
Expand Down Expand Up @@ -546,7 +554,7 @@ def __allocate_p4_switch(self, *, rid: ID, requested_sliver: NodeSliver, graph_i

def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, graph_node: BaseSliver,
existing_reservations: List[ABCReservationMixin], existing_components: Dict[str, List[str]],
is_create: bool = False) -> Tuple[str, BaseSliver]:
operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver]:
"""
Allocate an extending or ticketing reservation
:param rid: reservation id of the reservation to be allocated
Expand All @@ -555,7 +563,7 @@ def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, grap
:param graph_node: BQM graph node identified to serve the reservation
:param existing_components: Existing Components
:param existing_reservations: Existing Reservations served by the same BQM node
:param is_create: Indicates if this is create or modify
:param operation: Indicates if this is create or modify
:return: Tuple of Delegation Id and the Requested Sliver annotated with BQM Node Id and other properties
:raises: BrokerException in case the request cannot be satisfied
"""
Expand All @@ -578,12 +586,12 @@ def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, grap
if requested_sliver.get_type() == NodeType.Switch:
return self.__allocate_p4_switch(rid=rid, requested_sliver=requested_sliver, graph_id=graph_id,
graph_node=graph_node, existing_reservations=existing_reservations,
existing_components=existing_components, is_create=is_create)
existing_components=existing_components, operation=operation)

delegation_id = None
requested_capacities = None
# For create, we need to allocate the VM
if is_create:
if operation == ReservationOperation.Create:
# Always use requested capacities to be mapped from flavor i.e. capacity hints
requested_capacity_hints = requested_sliver.get_capacity_hints()
catalog = InstanceCatalog()
Expand All @@ -609,10 +617,10 @@ def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, grap
graph_node=graph_node,
existing_reservations=existing_reservations,
existing_components=existing_components,
is_create=is_create)
operation=operation)

# Do this only for create
if is_create:
if operation == ReservationOperation.Create:
requested_sliver.capacity_allocations = Capacities()
requested_sliver.capacity_allocations = Capacities.update(lab=requested_capacities)
requested_sliver.label_allocations = Labels(instance_parent=graph_node.get_name())
Expand Down

0 comments on commit 7e444aa

Please sign in to comment.