Skip to content

Commit

Permalink
advance scheduling thread to process future slices
Browse files Browse the repository at this point in the history
  • Loading branch information
kthare10 committed Nov 5, 2024
1 parent 2e4cdf7 commit 9b40eef
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 94 deletions.
3 changes: 1 addition & 2 deletions fabric_cf/actor/core/manage/local/local_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@
from fabric_mb.message_bus.messages.reservation_mng import ReservationMng
from fabric_mb.message_bus.messages.reservation_state_avro import ReservationStateAvro
from fabric_mb.message_bus.messages.slice_avro import SliceAvro
from fabric_cf.actor.core.manage.management_object import ManagementObject
from fabric_cf.actor.security.auth_token import AuthToken


class LocalActor(LocalProxy, ABCMgmtActor):
def __init__(self, *, manager: ManagementObject, auth: AuthToken):
def __init__(self, *, manager: ActorManagementObject, auth: AuthToken):
super().__init__(manager=manager, auth=auth)

if not isinstance(manager, ActorManagementObject):
Expand Down
180 changes: 180 additions & 0 deletions fabric_cf/orchestrator/core/advance_scheduling_thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2020 FABRIC Testbed
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
#
# Author: Komal Thareja ([email protected])
import queue
import threading
import time
import traceback

from fabric_cf.actor.core.time.actor_clock import ActorClock

from fabric_cf.actor.core.kernel.reservation_states import ReservationStates
from fabric_cf.actor.core.util.id import ID
from fabric_cf.actor.core.util.iterable_queue import IterableQueue
from fabric_cf.orchestrator.core.exceptions import OrchestratorException
from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper


class AdvanceSchedulingThread:
"""
This runs as a standalone thread started by Orchestrator and deals with determining the
nearest start time for slices requested in future.
The purpose of this thread is to help orchestrator respond back to the create
without identifying the start time and waiting for the slivers to be demanded
"""

def __init__(self, *, kernel):
self.slice_queue = queue.Queue()
self.slice_avail_condition = threading.Condition()
self.thread_lock = threading.Lock()
self.thread = None
self.stopped = False
from fabric_cf.actor.core.container.globals import GlobalsSingleton
self.logger = GlobalsSingleton.get().get_logger()
self.mgmt_actor = kernel.get_management_actor()
self.kernel = kernel

def queue_slice(self, *, controller_slice: OrchestratorSliceWrapper):
"""
Queue a slice
:param controller_slice:
:return:
"""
try:
self.slice_queue.put_nowait(controller_slice)
self.logger.debug(f"Added slice to slices queue {controller_slice.get_slice_id()}")
except Exception as e:
self.logger.error(f"Failed to queue slice: {controller_slice.get_slice_id()} e: {e}")

def start(self):
"""
Start thread
:return:
"""
try:
self.thread_lock.acquire()
if self.thread is not None:
raise OrchestratorException(f"This {self.__class__.__name__} has already been started")

self.thread = threading.Thread(target=self.run)
self.thread.setName(self.__class__.__name__)
self.thread.setDaemon(True)
self.thread.start()

finally:
self.thread_lock.release()

def stop(self):
"""
Stop thread
:return:
"""
self.stopped = True
try:
self.thread_lock.acquire()
temp = self.thread
self.thread = None
if temp is not None:
self.logger.warning(f"It seems that the {self.__class__.__name__} is running. "
f"Interrupting it")
try:
# TODO find equivalent of interrupt
with self.slice_avail_condition:
self.slice_avail_condition.notify_all()
temp.join()
except Exception as e:
self.logger.error(f"Could not join {self.__class__.__name__} thread {e}")
finally:
self.thread_lock.release()
finally:
if self.thread_lock is not None and self.thread_lock.locked():
self.thread_lock.release()

def run(self):
"""
Thread main loop
:return:
"""
self.logger.debug(f"{self.__class__.__name__} started")
while True:
slices = []
if not self.stopped and self.slice_queue.empty():
time.sleep(0.001)

if self.stopped:
self.logger.info(f"{self.__class__.__name__} exiting")
return

if not self.slice_queue.empty():
try:
for s in IterableQueue(source_queue=self.slice_queue):
slices.append(s)
except Exception as e:
self.logger.error(f"Error while adding slice to slice queue! e: {e}")
self.logger.error(traceback.format_exc())

if len(slices) > 0:
self.logger.debug(f"Processing {len(slices)} slices")
for s in slices:
try:
# Process the Slice i.e. Determine the nearest start time
# If start time found, queue the slice on SliceDeferredThread
# Else move the slice to Close State with failure
self.process_slice(controller_slice=s)
except Exception as e:
self.logger.error(f"Error while processing slice {type(s)}, {e}")
self.logger.error(traceback.format_exc())

def process_slice(self, *, controller_slice: OrchestratorSliceWrapper):
"""
Determine nearest start time
:param controller_slice:
:param lease_start_time: Lease Start Time (UTC)
:param lease_end_time: Lease End Time (UTC)
:param lifetime: Lifetime of the slice in hours
"""
computed_reservations = controller_slice.get_computed_reservations()

try:
controller_slice.lock()
future_start, future_end = self.kernel.determine_future_lease_time(
computed_reservations=computed_reservations,
start=controller_slice.start, end=controller_slice.end,
duration=controller_slice.lifetime)

controller_slice.slice_obj.set_lease_start(lease_start=future_start)
controller_slice.slice_obj.set_lease_end(lease_end=future_end)
self.logger.debug(f"Update Slice {controller_slice.slice_obj.slice_name}")
self.mgmt_actor.update_slice(slice_obj=controller_slice.slice_obj)
for r in computed_reservations:
r.set_start(value=ActorClock.to_milliseconds(when=future_start))
r.set_end(value=ActorClock.to_milliseconds(when=future_end))
self.mgmt_actor.update_reservation(reservation=r)
self.kernel.get_defer_thread().queue_slice(controller_slice=controller_slice)
except Exception as e:
self.logger.error(traceback.format_exc())
self.logger.error("Unable to get orchestrator or demand reservation: {}".format(e))
finally:
controller_slice.unlock()
12 changes: 10 additions & 2 deletions fabric_cf/orchestrator/core/orchestrator_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,17 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
new_slice_object.lock()

# Create Slivers from Slice Graph; Compute Reservations from Slivers;
computed_reservations = new_slice_object.create(slice_graph=asm_graph)
computed_reservations = new_slice_object.create(slice_graph=asm_graph,
lease_start_time=lease_start_time,
lease_end_time=lease_end_time,
lifetime=lifetime)
new_slice_object.update_topology(topology=topology)

# Check if Testbed in Maintenance or Site in Maintenance
self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations)

# TODO Future Slice
'''
if lease_start_time and lease_end_time and lifetime:
future_start, future_end = self.controller_state.determine_future_lease_time(computed_reservations=computed_reservations,
start=lease_start_time, end=lease_end_time,
Expand All @@ -336,6 +340,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
for r in computed_reservations:
r.set_start(value=ActorClock.to_milliseconds(when=future_start))
r.set_end(value=ActorClock.to_milliseconds(when=future_end))
'''

# Add Reservations to relational database;
new_slice_object.add_reservations()
Expand All @@ -346,7 +351,10 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
# Demand thread is responsible for demanding the reservations
# Helps improve the create response time
create_ts = time.time()
self.controller_state.get_defer_thread().queue_slice(controller_slice=new_slice_object)
if lease_start_time and lease_end_time and lifetime:
self.controller_state.get_advance_scheduling_thread().queue_slice(controller_slice=controller)
else:
self.controller_state.get_defer_thread().queue_slice(controller_slice=new_slice_object)
self.logger.info(f"QU queue: TIME= {time.time() - create_ts:.0f}")
EventLoggerSingleton.get().log_slice_event(slice_object=slice_obj, action=ActionId.create,
topology=topology)
Expand Down
Loading

0 comments on commit 9b40eef

Please sign in to comment.