diff --git a/octavia_f5/api/drivers/f5_driver/arbiter.py b/octavia_f5/api/drivers/f5_driver/arbiter.py index 7e87a6d9..b7de33d0 100644 --- a/octavia_f5/api/drivers/f5_driver/arbiter.py +++ b/octavia_f5/api/drivers/f5_driver/arbiter.py @@ -14,15 +14,15 @@ import abc +from octavia.common import base_taskflow from oslo_config import cfg from oslo_log import log as logging from taskflow import flow from taskflow.listeners import logging as tf_logging from taskflow.patterns import linear_flow, unordered_flow -from octavia.common import base_taskflow -from octavia_f5.controller.worker.tasks import network_tasks from octavia_f5.api.drivers.f5_driver.tasks import reschedule_tasks +from octavia_f5.controller.worker.tasks import network_tasks from octavia_f5.utils import driver_utils CONF = cfg.CONF @@ -83,13 +83,22 @@ def get_reschedule_flow(self) -> flow.Flow: update_vip_sub_flow = linear_flow.Flow("update-vip-sub-flow") update_vip_sub_flow.add(get_vip_port_task, update_vip_task, all_selfips_task, update_aap_task) - # update loadbalancer, amphora and vip and invalidate cache can be run parallelized + # update load balancer, amphora and vip and invalidate cache can be run parallelized update_database_flow = unordered_flow.Flow("database-update-flow") update_database_flow.add(rewrite_loadbalancer_task, rewrite_amphora_task, update_vip_sub_flow, invalidate_cache_task) + # We want to hold a lock between adding/removing the load balancer and updating the database, so that the + # load balancer won't be seen as an orphan (because it's on the wrong worker according to the DB) and cleaned + # up by the worker loop and cleaned up. + loadbalancer_lock_task = reschedule_tasks.LockLoadBalancer() + loadbalancer_release_task = reschedule_tasks.ReleaseLoadBalancer() + + switchover_load_balancer_flow = linear_flow.Flow('switchover-flow') + switchover_load_balancer_flow.add(loadbalancer_lock_task, add_remove_loadbalancer_flow, + update_database_flow, loadbalancer_release_task) + reschedule_flow = linear_flow.Flow('reschedule-flow') reschedule_flow.add(get_loadbalancer_task, get_old_agent_task, create_selfips_task, - wait_for_selfip_task, add_remove_loadbalancer_flow, - update_database_flow) + wait_for_selfip_task, switchover_load_balancer_flow) return reschedule_flow diff --git a/octavia_f5/api/drivers/f5_driver/tasks/reschedule_tasks.py b/octavia_f5/api/drivers/f5_driver/tasks/reschedule_tasks.py index b525005f..74293873 100644 --- a/octavia_f5/api/drivers/f5_driver/tasks/reschedule_tasks.py +++ b/octavia_f5/api/drivers/f5_driver/tasks/reschedule_tasks.py @@ -14,14 +14,14 @@ from abc import ABCMeta +from octavia.common import data_models as models +from octavia.db import api as db_apis from oslo_config import cfg from oslo_log import log as logging from taskflow import task from taskflow.types import failure -from octavia.common import constants -from octavia.common import data_models as models -from octavia.db import api as db_apis +from octavia_f5.common import constants as f5_const from octavia_f5.db import repositories as repo LOG = logging.getLogger(__name__) @@ -37,6 +37,30 @@ def __init__(self, **kwargs): self._amphora_repo = repo.AmphoraRepository() +class LockLoadBalancer(RescheduleTasks): + def execute(self, loadbalancer_id): + LOG.debug("Locking load balancer: %s ", loadbalancer_id) + # Fail if LB already locked. We probably don't want to wait until it's unlocked, since that would mean we're + # doing two migrations back-to-back, which we most definitely don't want. + if self._amphora_repo.get(db_apis.get_session(), id=loadbalancer_id) \ + .vrrp_interface == f5_const.RESCHEDULING_LOCK_STRING: + return failure.Failure(causes=["Cannot acquire lock for load balancer {}. It's already being rescheduled."]) + self._amphora_repo.update(db_apis.get_session(), loadbalancer_id, + vrrp_interface=f5_const.RESCHEDULING_LOCK_STRING) + # TODO temp - Testing revert + return failure.Failure(causes=["Testing revert"]) + + def revert(self, loadbalancer_id, **kwargs): + self._amphora_repo.update(db_apis.get_session(), loadbalancer_id, vrrp_interface=None) + + +class ReleaseLoadBalancer(RescheduleTasks): + def execute(self, loadbalancer_id): + LOG.debug("Releasing load balancer: %s ", loadbalancer_id) + self._amphora_repo.update(db_apis.get_session(), loadbalancer_id, vrrp_interface=None) + # TODO error handling without rolling back the whole migration + + class GetLoadBalancerByID(RescheduleTasks): default_provides = 'load_balancer' diff --git a/octavia_f5/common/constants.py b/octavia_f5/common/constants.py index 215b76ab..3fbb039e 100644 --- a/octavia_f5/common/constants.py +++ b/octavia_f5/common/constants.py @@ -60,6 +60,8 @@ ROLE_MASTER = 'MASTER' ROLE_BACKUP = 'BACKUP' +RESCHEDULING_LOCK_STRING = 'RESCHEDULING' + SEGMENT = 'segment' VIF_TYPE = 'f5' ESD = 'esd' diff --git a/octavia_f5/controller/worker/sync_manager.py b/octavia_f5/controller/worker/sync_manager.py index 3912c225..1a6eea55 100644 --- a/octavia_f5/controller/worker/sync_manager.py +++ b/octavia_f5/controller/worker/sync_manager.py @@ -183,6 +183,9 @@ def tenant_update(self, network_id, device=None, selfips=None, loadbalancers=Non if not loadbalancers: raise exceptions.AS3Exception("No loadbalancers specified for tenant_update") + # If any of the load balancers is currently being rescheduled, we want to hold back the whole declaration because we don't know whether the load balancer is supposed to exist on this device or not. + pass # TODO + decl = self._declaration_manager.get_declaration({network_id: loadbalancers}, self_ips) if CONF.f5_agent.dry_run: decl.set_action('dry-run')