From 9b40eef1fe3828e55938115e7f8b8088d6e9e5cd Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Tue, 5 Nov 2024 10:47:40 -0500 Subject: [PATCH] advance scheduling thread to process future slices --- .../actor/core/manage/local/local_actor.py | 3 +- .../core/advance_scheduling_thread.py | 180 ++++++++++++++++++ .../orchestrator/core/orchestrator_handler.py | 12 +- .../orchestrator/core/orchestrator_kernel.py | 85 +++++++-- .../core/orchestrator_slice_wrapper.py | 13 +- .../orchestrator/core/resource_tracker.py | 165 +++++++++------- 6 files changed, 364 insertions(+), 94 deletions(-) create mode 100644 fabric_cf/orchestrator/core/advance_scheduling_thread.py diff --git a/fabric_cf/actor/core/manage/local/local_actor.py b/fabric_cf/actor/core/manage/local/local_actor.py index 32e2ea13..3fb4ff83 100644 --- a/fabric_cf/actor/core/manage/local/local_actor.py +++ b/fabric_cf/actor/core/manage/local/local_actor.py @@ -45,12 +45,11 @@ from fabric_mb.message_bus.messages.reservation_mng import ReservationMng from fabric_mb.message_bus.messages.reservation_state_avro import ReservationStateAvro from fabric_mb.message_bus.messages.slice_avro import SliceAvro - from fabric_cf.actor.core.manage.management_object import ManagementObject from fabric_cf.actor.security.auth_token import AuthToken class LocalActor(LocalProxy, ABCMgmtActor): - def __init__(self, *, manager: ManagementObject, auth: AuthToken): + def __init__(self, *, manager: ActorManagementObject, auth: AuthToken): super().__init__(manager=manager, auth=auth) if not isinstance(manager, ActorManagementObject): diff --git a/fabric_cf/orchestrator/core/advance_scheduling_thread.py b/fabric_cf/orchestrator/core/advance_scheduling_thread.py new file mode 100644 index 00000000..f5a55126 --- /dev/null +++ b/fabric_cf/orchestrator/core/advance_scheduling_thread.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2020 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# +# Author: Komal Thareja (kthare10@renci.org) +import queue +import threading +import time +import traceback + +from fabric_cf.actor.core.time.actor_clock import ActorClock + +from fabric_cf.actor.core.kernel.reservation_states import ReservationStates +from fabric_cf.actor.core.util.id import ID +from fabric_cf.actor.core.util.iterable_queue import IterableQueue +from fabric_cf.orchestrator.core.exceptions import OrchestratorException +from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper + + +class AdvanceSchedulingThread: + """ + This runs as a standalone thread started by Orchestrator and deals with determining the + nearest start time for slices requested in future. + The purpose of this thread is to help orchestrator respond back to the create + without identifying the start time and waiting for the slivers to be demanded + """ + + def __init__(self, *, kernel): + self.slice_queue = queue.Queue() + self.slice_avail_condition = threading.Condition() + self.thread_lock = threading.Lock() + self.thread = None + self.stopped = False + from fabric_cf.actor.core.container.globals import GlobalsSingleton + self.logger = GlobalsSingleton.get().get_logger() + self.mgmt_actor = kernel.get_management_actor() + self.kernel = kernel + + def queue_slice(self, *, controller_slice: OrchestratorSliceWrapper): + """ + Queue a slice + :param controller_slice: + :return: + """ + try: + self.slice_queue.put_nowait(controller_slice) + self.logger.debug(f"Added slice to slices queue {controller_slice.get_slice_id()}") + except Exception as e: + self.logger.error(f"Failed to queue slice: {controller_slice.get_slice_id()} e: {e}") + + def start(self): + """ + Start thread + :return: + """ + try: + self.thread_lock.acquire() + if self.thread is not None: + raise OrchestratorException(f"This {self.__class__.__name__} has already been started") + + self.thread = threading.Thread(target=self.run) + self.thread.setName(self.__class__.__name__) + self.thread.setDaemon(True) + self.thread.start() + + finally: + self.thread_lock.release() + + def stop(self): + """ + Stop thread + :return: + """ + self.stopped = True + try: + self.thread_lock.acquire() + temp = self.thread + self.thread = None + if temp is not None: + self.logger.warning(f"It seems that the {self.__class__.__name__} is running. " + f"Interrupting it") + try: + # TODO find equivalent of interrupt + with self.slice_avail_condition: + self.slice_avail_condition.notify_all() + temp.join() + except Exception as e: + self.logger.error(f"Could not join {self.__class__.__name__} thread {e}") + finally: + self.thread_lock.release() + finally: + if self.thread_lock is not None and self.thread_lock.locked(): + self.thread_lock.release() + + def run(self): + """ + Thread main loop + :return: + """ + self.logger.debug(f"{self.__class__.__name__} started") + while True: + slices = [] + if not self.stopped and self.slice_queue.empty(): + time.sleep(0.001) + + if self.stopped: + self.logger.info(f"{self.__class__.__name__} exiting") + return + + if not self.slice_queue.empty(): + try: + for s in IterableQueue(source_queue=self.slice_queue): + slices.append(s) + except Exception as e: + self.logger.error(f"Error while adding slice to slice queue! e: {e}") + self.logger.error(traceback.format_exc()) + + if len(slices) > 0: + self.logger.debug(f"Processing {len(slices)} slices") + for s in slices: + try: + # Process the Slice i.e. Determine the nearest start time + # If start time found, queue the slice on SliceDeferredThread + # Else move the slice to Close State with failure + self.process_slice(controller_slice=s) + except Exception as e: + self.logger.error(f"Error while processing slice {type(s)}, {e}") + self.logger.error(traceback.format_exc()) + + def process_slice(self, *, controller_slice: OrchestratorSliceWrapper): + """ + Determine nearest start time + :param controller_slice: + :param lease_start_time: Lease Start Time (UTC) + :param lease_end_time: Lease End Time (UTC) + :param lifetime: Lifetime of the slice in hours + """ + computed_reservations = controller_slice.get_computed_reservations() + + try: + controller_slice.lock() + future_start, future_end = self.kernel.determine_future_lease_time( + computed_reservations=computed_reservations, + start=controller_slice.start, end=controller_slice.end, + duration=controller_slice.lifetime) + + controller_slice.slice_obj.set_lease_start(lease_start=future_start) + controller_slice.slice_obj.set_lease_end(lease_end=future_end) + self.logger.debug(f"Update Slice {controller_slice.slice_obj.slice_name}") + self.mgmt_actor.update_slice(slice_obj=controller_slice.slice_obj) + for r in computed_reservations: + r.set_start(value=ActorClock.to_milliseconds(when=future_start)) + r.set_end(value=ActorClock.to_milliseconds(when=future_end)) + self.mgmt_actor.update_reservation(reservation=r) + self.kernel.get_defer_thread().queue_slice(controller_slice=controller_slice) + except Exception as e: + self.logger.error(traceback.format_exc()) + self.logger.error("Unable to get orchestrator or demand reservation: {}".format(e)) + finally: + controller_slice.unlock() diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 1e917be2..ce2060b8 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -317,13 +317,17 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key new_slice_object.lock() # Create Slivers from Slice Graph; Compute Reservations from Slivers; - computed_reservations = new_slice_object.create(slice_graph=asm_graph) + computed_reservations = new_slice_object.create(slice_graph=asm_graph, + lease_start_time=lease_start_time, + lease_end_time=lease_end_time, + lifetime=lifetime) new_slice_object.update_topology(topology=topology) # Check if Testbed in Maintenance or Site in Maintenance self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations) # TODO Future Slice + ''' if lease_start_time and lease_end_time and lifetime: future_start, future_end = self.controller_state.determine_future_lease_time(computed_reservations=computed_reservations, start=lease_start_time, end=lease_end_time, @@ -336,6 +340,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key for r in computed_reservations: r.set_start(value=ActorClock.to_milliseconds(when=future_start)) r.set_end(value=ActorClock.to_milliseconds(when=future_end)) + ''' # Add Reservations to relational database; new_slice_object.add_reservations() @@ -346,7 +351,10 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key # Demand thread is responsible for demanding the reservations # Helps improve the create response time create_ts = time.time() - self.controller_state.get_defer_thread().queue_slice(controller_slice=new_slice_object) + if lease_start_time and lease_end_time and lifetime: + self.controller_state.get_advance_scheduling_thread().queue_slice(controller_slice=controller) + else: + self.controller_state.get_defer_thread().queue_slice(controller_slice=new_slice_object) self.logger.info(f"QU queue: TIME= {time.time() - create_ts:.0f}") EventLoggerSingleton.get().log_slice_event(slice_object=slice_obj, action=ActionId.create, topology=topology) diff --git a/fabric_cf/orchestrator/core/orchestrator_kernel.py b/fabric_cf/orchestrator/core/orchestrator_kernel.py index 174635f0..7cceac8d 100644 --- a/fabric_cf/orchestrator/core/orchestrator_kernel.py +++ b/fabric_cf/orchestrator/core/orchestrator_kernel.py @@ -25,7 +25,9 @@ # Author: Komal Thareja (kthare10@renci.org) import threading from datetime import datetime, timedelta +from heapq import heappush, heappop from http.client import BAD_REQUEST +from typing import List, Iterator, Tuple, Optional from fabric_cf.actor.core.time.actor_clock import ActorClock @@ -44,6 +46,7 @@ from fabric_cf.actor.core.core.event_processor import EventProcessor from fabric_cf.actor.core.manage.management_utils import ManagementUtils from fabric_cf.actor.core.util.id import ID +from fabric_cf.orchestrator.core.advance_scheduling_thread import AdvanceSchedulingThread from fabric_cf.orchestrator.core.bqm_wrapper import BqmWrapper from fabric_cf.orchestrator.core.exceptions import OrchestratorException from fabric_cf.orchestrator.core.reservation_status_update_thread import ReservationStatusUpdateThread @@ -71,6 +74,7 @@ class OrchestratorKernel(ABCTick): def __init__(self): self.defer_thread = None + self.adv_sch_thread = None self.sut = None self.broker = None self.logger = None @@ -130,6 +134,9 @@ def get_broker(self) -> ID: def get_defer_thread(self) -> SliceDeferThread: return self.defer_thread + def get_advance_scheduling_thread(self) -> AdvanceSchedulingThread: + return self.adv_sch_thread + def get_sut(self) -> ReservationStatusUpdateThread: """ Get SUT thread @@ -161,6 +168,8 @@ def stop_threads(self): Stop threads :return: """ + if self.adv_sch_thread: + self.adv_sch_thread.stop() if self.defer_thread is not None: self.defer_thread.stop() if self.event_processor is not None: @@ -199,6 +208,8 @@ def start_threads(self): self.defer_thread.start() self.event_processor = EventProcessor(name="PeriodicProcessor", logger=self.logger) self.event_processor.start() + self.adv_sch_thread = AdvanceSchedulingThread(kernel=self) + self.adv_sch_thread.start() #self.get_logger().debug("Starting ReservationStatusUpdateThread") #self.sut = ReservationStatusUpdateThread() #self.sut.start() @@ -224,6 +235,32 @@ def external_tick(self, *, cycle: int): def get_name(self) -> str: return self.__class__.__name__ + def find_common_start_time(self, reservation_start_times: list[list[datetime]]) -> datetime: + """ + Find the earliest common start time for a group of reservations. + + :param reservation_start_times: A list of lists, where each sublist contains possible start times for a reservation. + :type reservation_start_times: List[List[datetime]] + :return: The earliest common start time, or None if no common start time is found. + :rtype: datetime + """ + if not reservation_start_times: + return None + + # Convert the first list to a set of datetimes + common_times = set(reservation_start_times[0]) + + # Find the intersection with other reservation start times + for start_times in reservation_start_times[1:]: + common_times.intersection_update(start_times) + + # If there are no common times, return None + if not common_times: + return None + + # Return the earliest common start time + return min(common_times) + def determine_future_lease_time(self, computed_reservations: list[LeaseReservationAvro], start: datetime, end: datetime, duration: int) -> tuple[datetime, datetime]: """ @@ -240,8 +277,9 @@ def determine_future_lease_time(self, computed_reservations: list[LeaseReservati :param duration: Requested duration in hours. :type duration: int :return: The nearest available start time and corresponding end time when all reservations can start together. + : Given start, start + duration is returned if no future reservation time can be found resulting in + : slice closure. :rtype: tuple of datetime, datetime - :raises OrchestratorException: If no valid start time can satisfy the requested duration for all reservations. """ states = [ReservationStates.Active.value, ReservationStates.ActiveTicketed.value, @@ -260,12 +298,13 @@ def determine_future_lease_time(self, computed_reservations: list[LeaseReservati sliver=requested_sliver) if not candidate_nodes: - raise OrchestratorException(http_error_code=BAD_REQUEST, - message=f'Insufficient resources: No hosts available to ' - f'provision the {r.get_sliver()}') + self.logger.error(f'Insufficient resources: No hosts available to provision the {r.get_sliver()}') + # Reservation will fail at the Broker with Insufficient resources + # Triggering Slice closure + return start, start + timedelta(hours=duration) # Gather the nearest available start time per candidate node for this reservation - reservation_times = [] + reservation_times = set() for c in candidate_nodes: cbm_node = self.combined_broker_model.build_deep_node_sliver(node_id=c) # Skip if CBM node is not the specific host that is requested @@ -279,27 +318,39 @@ def determine_future_lease_time(self, computed_reservations: list[LeaseReservati tracker = resource_trackers[c] # Add slivers from reservations to the tracker for e in existing: - tracker.add_sliver(sliver=e.get_sliver(), - end=ActorClock.from_milliseconds(milli_seconds=e.get_end()), - reservation_id=e.get_reservation_id()) + tracker.update(reservation=e, start=start, end=end) - future_start_time = tracker.find_next_available(requested_sliver=requested_sliver, from_time=start) - if future_start_time: - reservation_times.append(future_start_time) + start_times = tracker.find_next_available(requested_sliver=requested_sliver, start=start, + end=end, duration=duration) + if start_times: + start_times = sorted(start_times) + reservation_times.update(start_times) if not len(reservation_times): - raise OrchestratorException("Sliver request cannot be satisfied in the requested duration!") + self.logger.error(f"Sliver {requested_sliver} request cannot be satisfied in the requested duration!") + # Reservation will fail at the Broker with Insufficient resources + # Triggering Slice closure + return start, start + timedelta(hours=duration) # Add the earliest start time for the reservation to future_start_times - future_start_times.append(min(reservation_times)) + future_start_times.append(list(reservation_times)) # Find the nearest start time across all reservations where they can start together - simultaneous_start_time = max(future_start_times) - - # Verify that the simultaneous start time allows all reservations to run for the full requested duration + simultaneous_start_time = self.find_common_start_time(reservation_start_times=future_start_times) + if not simultaneous_start_time: + self.logger.error("Slice cannot be satisfied in the requested duration!") + # Reservation will fail at the Broker with Insufficient resources + # Triggering Slice closure + return start, start + timedelta(hours=duration) + + # Verify that the simultaneous start time allows all reservations to run + # for the full requested duration final_time = simultaneous_start_time + timedelta(hours=duration) if final_time > end: - raise OrchestratorException("No common start time available for the requested duration.") + self.logger.error("No common start time available for the requested duration.") + # Reservation will fail at the Broker with Insufficient resources + # Triggering Slice closure + return start, start + timedelta(hours=duration) return simultaneous_start_time, final_time diff --git a/fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py b/fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py index 3d8c6d1e..79450f3d 100644 --- a/fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py +++ b/fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py @@ -26,6 +26,7 @@ import ipaddress import threading import time +from datetime import datetime from ipaddress import IPv4Network from typing import List, Tuple, Dict from http.client import BAD_REQUEST, NOT_FOUND @@ -79,6 +80,9 @@ def __init__(self, *, controller: ABCMgmtControllerMixin, broker: ID, slice_obj: # Reservations trigger ModifyLease (AM) self.computed_modify_properties_reservations = [] self.thread_lock = threading.Lock() + self.start = None + self.end = None + self.lifetime = None def lock(self): """ @@ -147,13 +151,20 @@ def add_reservations(self): self.controller.add_reservation(reservation=r) self.logger.info(f"ADD TIME: {time.time() - start:.0f}") - def create(self, *, slice_graph: ABCASMPropertyGraph) -> List[LeaseReservationAvro]: + def create(self, *, slice_graph: ABCASMPropertyGraph, lease_start_time: datetime = None, + lease_end_time: datetime = None, lifetime: int = None) -> List[LeaseReservationAvro]: """ Create a slice :param slice_graph: Slice Graph + :param lease_start_time: Lease Start Time (UTC) + :param lease_end_time: Lease End Time (UTC) + :param lifetime: Lifetime of the slice in hours :return: List of computed reservations """ try: + self.start = lease_start_time + self.end = lease_end_time + self.lifetime = lifetime # Build Network Node reservations start = time.time() network_node_reservations, node_res_mapping = self.__build_network_node_reservations(slice_graph=slice_graph) diff --git a/fabric_cf/orchestrator/core/resource_tracker.py b/fabric_cf/orchestrator/core/resource_tracker.py index 0fc28d15..6f6330c4 100644 --- a/fabric_cf/orchestrator/core/resource_tracker.py +++ b/fabric_cf/orchestrator/core/resource_tracker.py @@ -24,16 +24,18 @@ # # Author: Komal Thareja (kthare10@renci.org) from collections import defaultdict -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta +from fabric_cf.actor.core.time.actor_clock import ActorClock +from fabric_mb.message_bus.messages.reservation_mng import ReservationMng from fim.slivers.base_sliver import BaseSliver from fim.slivers.attached_components import ComponentSliver, ComponentType -from fim.slivers.capacities_labels import Capacities +from fim.slivers.capacities_labels import Capacities, FreeCapacity from fim.slivers.network_node import NodeSliver class TimeSlot: - """Represents a time slot for resource availability, tracking capacities and components.""" + """Represents a time slot for resources, tracking capacities and components.""" def __init__(self, end: datetime): """ @@ -43,29 +45,34 @@ def __init__(self, end: datetime): :type end: datetime """ self.end = end - self.available_capacities = Capacities() - self.available_components = {} + self.capacities = Capacities() + self.components = {} def __update_capacities(self, capacities: Capacities): """ - Update the available capacities in this time slot. + Update the capacities in this time slot. :param capacities: The capacities to add to this time slot. :type capacities: Capacities """ - self.available_capacities += capacities + self.capacities += capacities def __update_components(self, by_type: dict[ComponentType, list[ComponentSliver]]): """ - Update the available components by type in this time slot. + Update the components by type in this time slot. :param by_type: Dictionary with component types as keys and lists of ComponentSliver as values. :type by_type: dict[ComponentType, list[ComponentSliver]] """ for comp_type, comps in by_type.items(): - if comp_type not in self.available_components: - self.available_components[comp_type] = 0 - self.available_components[comp_type] += len(comps) + if comp_type not in self.components: + self.components[comp_type] = 0 + for c in comps: + if c.get_capacities(): + units = c.get_capacities().unit + else: + units = c.get_capacity_allocations().unit + self.components[comp_type] += units def add_sliver(self, sliver: BaseSliver): """ @@ -85,16 +92,16 @@ def add_sliver(self, sliver: BaseSliver): def __str__(self): """ - Return a string representation of the available capacities and components in this time slot. + Return a string representation of the capacities and components in this time slot. - :return: String representation of available capacities and components. + :return: String representation of capacities and components. :rtype: str """ - return f"Capacities: {self.available_capacities}, Components: {self.available_components}" + return f"Capacities: {self.capacities}, Components: {self.components}" class ResourceTracker: - """Tracks resource availability over time slots and checks availability of resources.""" + """Tracks resource over time slots and checks availability of resources.""" def __init__(self, cbm_node: NodeSliver): """ @@ -110,98 +117,112 @@ def __init__(self, cbm_node: NodeSliver): for comp_type, comps in cbm_node.attached_components_info.by_type.items(): if comp_type not in self.total_components: self.total_components[comp_type] = 0 - self.total_components[comp_type] += len(comps) + for c in comps: + self.total_components[comp_type] += c.get_capacities().unit self.time_slots = defaultdict(TimeSlot) self.reservation_ids = set() - def add_sliver(self, end: datetime, sliver: BaseSliver, reservation_id: str): + def update(self, reservation: ReservationMng, start: datetime, end: datetime): """ - Add sliver to the nearest hour time slot and update total available resources. + Update allocated resource information. - :param end: The end datetime of the reservation for the sliver. + :param reservation: Existing Reservation. + :type reservation: ReservationMng + :param start: Requested start datetime. + :type start: datetime + :param end: Requested end datetime. :type end: datetime - :param sliver: The sliver containing resources to add to the time slot. - :type sliver: BaseSliver - :param reservation_id: Reservation id of the reservation to which the sliver belomgs - :type reservation_id: str """ # Check if reservation has already been captured, if so skip it - if reservation_id in self.reservation_ids: - return - self.reservation_ids.add(reservation_id) - nearest_hour = end.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) - if nearest_hour not in self.time_slots: - self.time_slots[nearest_hour] = TimeSlot(nearest_hour) - slot = self.time_slots[nearest_hour] - slot.add_sliver(sliver=sliver) - if sliver.capacity_allocations: - self.total_capacities -= sliver.capacity_allocations - else: - self.total_capacities -= sliver.capacities - - if not sliver.attached_components_info: + if reservation.get_reservation_id() in self.reservation_ids: return + self.reservation_ids.add(reservation.get_reservation_id()) + + start = start.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + end = end.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + sliver_end = ActorClock.from_milliseconds(milli_seconds=reservation.get_end()) + sliver_end = sliver_end.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + + current_time = start + while current_time < sliver_end and current_time < end: + if current_time not in self.time_slots: + self.time_slots[current_time] = TimeSlot(current_time) + + # Update the specific time slot with the sliver's resource usage + self.time_slots[current_time].add_sliver(sliver=reservation.get_sliver()) - for comp_type, comps in sliver.attached_components_info.by_type.items(): - self.total_components[comp_type] -= len(comps) + current_time += timedelta(hours=1) - @staticmethod - def __check_components(requested_sliver: NodeSliver, - available_components: dict[ComponentType, int]) -> bool: + def __check_components(self, requested_sliver: NodeSliver, + allocated: dict[ComponentType, int]) -> bool: """ Check if requested components can be fulfilled by available components. :param requested_sliver: The sliver with requested components. :type requested_sliver: NodeSliver - :param available_components: Dictionary of available components by type. - :type available_components: dict[ComponentType, int] + :param allocated: Dictionary of available components by type. + :type allocated: dict[ComponentType, int] :return: True if components can be fulfilled, False otherwise. :rtype: bool """ if not requested_sliver.attached_components_info: return True for comp_type, comps in requested_sliver.attached_components_info.by_type.items(): - if comp_type not in available_components: + if comp_type not in self.total_components: return False - elif available_components[comp_type] < len(comps): + elif len(comps) > (self.total_components[comp_type] - allocated.get(comp_type, 0)): return False - else: - available_components[comp_type] -= len(comps) return True - def find_next_available(self, requested_sliver: NodeSliver, - from_time: datetime = datetime.now(timezone.utc)) -> datetime: + def find_next_available(self, requested_sliver: NodeSliver, start: datetime, end: datetime, + duration: int) -> list[datetime]: """ - Find the next available time slot that can fulfill the requested sliver capacities and components. + Find the next available time slot that can fulfill the requested sliver's capacities and components. :param requested_sliver: The sliver with requested capacities and components. :type requested_sliver: NodeSliver - :param from_time: The datetime from which to search for availability. - :type from_time: datetime - :return: The datetime of the next available time slot, or None if not found. + :param start: The start datetime to begin searching for availability. + :type start: datetime + :param end: The end datetime to stop searching. + :type end: datetime + :param duration: The duration in hours for which the resources are needed. + :type duration: int + :return: List of all possible next available time slot, or empty list if not found. :rtype: datetime """ - nearest_hour = from_time.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + current_time = start.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + duration_timedelta = timedelta(hours=duration) + ret_val = [] + + while current_time + duration_timedelta <= end: + available = True + for i in range(duration): + time_slot_time = current_time + timedelta(hours=i) + if time_slot_time not in self.time_slots: + # If there's no entry for this time slot, assume full capacity available + continue + + time_slot = self.time_slots[time_slot_time] + free_capacity = FreeCapacity(total=self.total_capacities, allocated=time_slot.capacities) + + # Check if accumulated capacities are negative (means not enough capacity) + if free_capacity.free.negative_fields(): + available = False + break - if not (self.total_capacities - requested_sliver.capacities).negative_fields() and \ - self.__check_components(requested_sliver=requested_sliver, - available_components=self.total_components): - return nearest_hour + diff = free_capacity.free - requested_sliver.capacities + if diff.negative_fields(): + available = False + break - sorted_times = sorted(self.time_slots.keys(), key=lambda x: abs(x - nearest_hour)) + if not self.__check_components(requested_sliver=requested_sliver, allocated=time_slot.components): + available = False + break - accumulated_capacities = Capacities() - accumulated_components = {} + if available: + ret_val.append(current_time) - for closest_time in sorted_times: - slot = self.time_slots[closest_time] - accumulated_capacities += slot.available_capacities - for comp_type, comp_count in slot.available_components.items(): - if comp_type not in accumulated_components: - accumulated_components[comp_type] = 0 - accumulated_components[comp_type] += comp_count + current_time += timedelta(hours=1) - if self.__check_components(requested_sliver, accumulated_components) and \ - not (accumulated_capacities - requested_sliver.capacities).negative_fields(): - return closest_time + return ret_val