diff --git a/backend/src/xfd_django/xfd_api/tasks/run_syncdb.py b/backend/src/xfd_django/xfd_api/tasks/run_syncdb.py index c806300e..88c4279b 100644 --- a/backend/src/xfd_django/xfd_api/tasks/run_syncdb.py +++ b/backend/src/xfd_django/xfd_api/tasks/run_syncdb.py @@ -3,15 +3,11 @@ # Third-Party Libraries import django - -# Set the Django settings module -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xfd_django.settings") -os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" - -# Initialize Django -django.setup() - from django.core.management import call_command +from django.db import migrations, connection +from django.db.migrations.executor import MigrationExecutor + +# Custom Utilities from xfd_api.tasks.syndb_helpers import manage_elasticsearch_indices, populate_sample_data @@ -20,24 +16,31 @@ def handler(event, context): Lambda handler to trigger syncdb. """ + # Set the Django settings module + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xfd_django.settings") + os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" + + # Initialize Django + django.setup() + # Parse arguments from the event dangerouslyforce = event.get("dangerouslyforce", False) populate = event.get("populate", False) try: - # Step 1: Database Reset and Migration + # Drop and recreate the database if dangerouslyforce is true if dangerouslyforce: print("Dropping and recreating the database...") call_command("flush", "--noinput") - call_command("migrate") # Apply migrations - else: - print("Applying migrations...") - call_command("migrate") # Apply migrations - # Step 2: Elasticsearch Index Management + # Generate and apply migrations dynamically + print("Applying migrations dynamically...") + apply_dynamic_migrations() + + # Elasticsearch Index Management manage_elasticsearch_indices(dangerouslyforce) - # Step 3: Populate Sample Data + # Populate Sample Data if populate: print("Populating the database with sample data...") populate_sample_data() @@ -53,3 +56,26 @@ def handler(event, context): "statusCode": 500, "body": f"Database synchronization failed: {str(e)}", } + + +def apply_dynamic_migrations(): + """ + Dynamically detect, create, and apply migrations without writing migration files. + """ + connection.prepare_database() # Ensure the database is initialized + executor = MigrationExecutor(connection) + executor.loader.build_graph() # Load the migration graph + + # Detect unapplied migrations + targets = executor.loader.graph.leaf_nodes() + print(targets) + plan = executor.migration_plan(targets) + print(plan) + + if not plan: + print("No migrations to apply.") + return + + # Apply migrations dynamically + print(f"Applying migrations: {targets}") + executor.migrate(targets)