From 4f04a28f4b53578d8f5123a5e919ecf8d6a9bd0f Mon Sep 17 00:00:00 2001 From: Virgin Bitton Date: Tue, 4 Apr 2023 13:45:13 +0200 Subject: [PATCH] feat: Add JoinField and update indexation command to select on which models to start the indexation --- django_opensearch_dsl/documents.py | 9 +- django_opensearch_dsl/fields.py | 4 + .../management/commands/opensearch.py | 170 ++++++++++++++---- django_opensearch_dsl/registries.py | 4 + 4 files changed, 145 insertions(+), 42 deletions(-) diff --git a/django_opensearch_dsl/documents.py b/django_opensearch_dsl/documents.py index 3a6f4f4..52906e1 100644 --- a/django_opensearch_dsl/documents.py +++ b/django_opensearch_dsl/documents.py @@ -63,9 +63,11 @@ def search(cls, using=None, index=None): using=cls._get_using(using), index=cls._default_index(index), doc_type=[cls], model=cls.django.model ) - def get_queryset(self, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None) -> QuerySet: + def get_queryset( + self, db_alias: str = None, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None + ) -> QuerySet: """Return the queryset that should be indexed by this doc type.""" - qs = self.django.model.objects.all() + qs = self.django.model.objects.using(db_alias).all() if filter_: qs = qs.filter(filter_) @@ -88,6 +90,7 @@ def _eta(self, start, done, total): # pragma: no cover def get_indexing_queryset( self, + db_alias: str = None, verbose: bool = False, filter_: Optional[Q] = None, exclude: Optional[Q] = None, @@ -97,7 +100,7 @@ def get_indexing_queryset( ) -> Iterable: """Divide the queryset into chunks.""" chunk_size = self.django.queryset_pagination - qs = self.get_queryset(filter_=filter_, exclude=exclude, count=count) + qs = self.get_queryset(db_alias=db_alias, filter_=filter_, exclude=exclude, count=count) qs = qs.order_by("pk") if not qs.query.is_sliced else qs count = qs.count() model = self.django.model.__name__ diff --git a/django_opensearch_dsl/fields.py b/django_opensearch_dsl/fields.py index 41b98bc..cb77f7e 100644 --- a/django_opensearch_dsl/fields.py +++ b/django_opensearch_dsl/fields.py @@ -218,6 +218,10 @@ class IpField(DODField, fields.Ip): """Allow indexing of IPv4 and IPv6 addresses.""" +class JoinField(DODField, fields.Join): + """Allow indexing of Join fields (with parent/child relation).""" + + class LongField(DODField, fields.Long): """Allow indexing of long. diff --git a/django_opensearch_dsl/management/commands/opensearch.py b/django_opensearch_dsl/management/commands/opensearch.py index 59e912b..84881be 100644 --- a/django_opensearch_dsl/management/commands/opensearch.py +++ b/django_opensearch_dsl/management/commands/opensearch.py @@ -12,6 +12,7 @@ from django.core.exceptions import FieldError from django.core.management import BaseCommand from django.core.management.base import OutputWrapper +from django.db import DEFAULT_DB_ALIAS from django.db.models import Q from django_opensearch_dsl.registries import registry @@ -109,6 +110,13 @@ def _manage_index(self, action, indices, force, verbosity, ignore_error, **optio ) # noqa self.stdout.flush() try: + # If current index depends on many different models, add them to + # index._doc_types before indexing to make sure all mappings of different models + # are taken into account. + index_models = registry.get_indices_raw().get(index, None) + for model in list(index_models): + index._doc_types.append(model) + if action == OpensearchAction.CREATE: index.create() elif action == OpensearchAction.DELETE: @@ -133,7 +141,20 @@ def _manage_index(self, action, indices, force, verbosity, ignore_error, **optio self.stdout.write(f"{pp} index '{index._name}'... {self.style.SUCCESS('OK')}") # noqa def _manage_document( - self, action, indices, force, filters, excludes, verbosity, parallel, count, refresh, missing, **options + self, + action, + indices, + objects, + force, + filters, + excludes, + verbosity, + parallel, + count, + refresh, + missing, + database, + **options, ): # noqa """Manage the creation and deletion of indices.""" action = OpensearchAction(action) @@ -141,6 +162,17 @@ def _manage_document( filter_ = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in filters)) if filters else None exclude = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in excludes)) if excludes else None + # Filter existing objects + valid_models = [] + registered_models = [m.__name__.lower() for m in registry.get_models()] + if objects: + for model in objects: + if model.lower() in registered_models: + valid_models.append(model) + else: + self.stderr.write(f"Unknown object '{model}', choices are: '{registered_models}'") + exit(1) + # Filter indices if indices: # Ensure every given indices exists @@ -165,23 +197,50 @@ def _manage_document( # Check field, preparing to display expected actions s = f"The following documents will be {action.past}:" kwargs_list = [] - for index in indices: + + if objects: + django_models = [m for m in registry.get_models() if m.__name__.lower() in valid_models] + all_os_models = [] + selected_os_models = [] + indices_raw = registry.get_indices_raw() + + for k, v in indices_raw.items(): + for model in list(v): + all_os_models.append(model) + + for os_model in all_os_models: + if os_model.django.model in django_models and os_model.Index.name in list(i._name for i in indices): + selected_os_models.append(os_model) + # Handle --missing exclude_ = exclude - if missing and action == OpensearchAction.INDEX: - q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()]) - exclude_ = exclude_ & q if exclude_ is not None else q - - document = index._doc_types[0]() # noqa - try: - kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count}) - qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count).count() - except FieldError as e: - model = index._doc_types[0].django.model.__name__ # noqa - self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa - exit(1) - else: - s += f"\n\t- {qs} {document.django.model.__name__}." + for model in selected_os_models: + try: + kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count}) + qs = model().get_queryset(filter_=filter_, exclude=exclude_, count=count).count() + except FieldError as e: + self.stderr.write(f"Error while filtering on '{model.django.model.__name__}':\n{e}'") # noqa + exit(1) + else: + s += f"\n\t- {qs} {model.django.model.__name__}." + else: + for index in indices: + # Handle --missing + exclude_ = exclude + if missing and action == OpensearchAction.INDEX: + q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()]) + exclude_ = exclude_ & q if exclude_ is not None else q + + document = index._doc_types[0]() # noqa + try: + kwargs_list.append({"db_alias": database, "filter_": filter_, "exclude": exclude_, "count": count}) + qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count).count() + except FieldError as e: + model = index._doc_types[0].django.model.__name__ # noqa + self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa + exit(1) + else: + s += f"\n\t- {qs} {document.django.model.__name__}." # Display expected actions if verbosity or not force: @@ -198,28 +257,53 @@ def _manage_document( exit(1) result = "\n" - for index, kwargs in zip(indices, kwargs_list): - document = index._doc_types[0]() # noqa - qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs) - success, errors = document.update( - qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False - ) - - success_str = self.style.SUCCESS(success) if success else success - errors_str = self.style.ERROR(len(errors)) if errors else len(errors) - model = document.django.model.__name__ - - if verbosity == 1: - result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" - reasons = defaultdict(int) - for e in errors: # Count occurrence of each error - error = e.get(action, {"result": "unknown error"}).get("result", "unknown error") - reasons[error] += 1 - for reasons, total in reasons.items(): - result += f" - {reasons} : {total}\n" - - if verbosity > 1: - result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" + if objects: + for model, kwargs in zip(selected_os_models, kwargs_list): + document = model() # noqa + qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs) + success, errors = document.update( + qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False + ) + + success_str = self.style.SUCCESS(success) if success else success + errors_str = self.style.ERROR(len(errors)) if errors else len(errors) + model = document.django.model.__name__ + + if verbosity == 1: + result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" + reasons = defaultdict(int) + for e in errors: # Count occurrence of each error + error = e.get(action, {"result": "unknown error"}).get("result", "unknown error") + reasons[error] += 1 + for reasons, total in reasons.items(): + result += f" - {reasons} : {total}\n" + + if verbosity > 1: + result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" + + else: + for index, kwargs in zip(indices, kwargs_list): + document = index._doc_types[0]() # noqa + qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs) + success, errors = document.update( + qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False + ) + + success_str = self.style.SUCCESS(success) if success else success + errors_str = self.style.ERROR(len(errors)) if errors else len(errors) + model = document.django.model.__name__ + + if verbosity == 1: + result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" + reasons = defaultdict(int) + for e in errors: # Count occurrence of each error + error = e.get(action, {"result": "unknown error"}).get("result", "unknown error") + reasons[error] += 1 + for reasons, total in reasons.items(): + result += f" - {reasons} : {total}\n" + + if verbosity > 1: + result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" if verbosity: self.stdout.write(result + "\n") @@ -237,7 +321,7 @@ def add_arguments(self, parser): ) subparser.set_defaults(func=self.__list_index) - # 'manage' subcommand + # 'index' subcommand subparser = subparsers.add_parser( "index", help="Manage the creation an deletion of indices.", @@ -288,6 +372,13 @@ def add_arguments(self, parser): OpensearchAction.UPDATE.value, ], ) + subparser.add_argument( + "-d", + "--database", + type=str, + default=None, + help="Nominates a database to use as source.", + ) subparser.add_argument( "-f", "--filters", @@ -321,6 +412,7 @@ def add_arguments(self, parser): subparser.add_argument( "-i", "--indices", type=str, nargs="*", help="Only update documents on the given indices." ) + subparser.add_argument("-o", "--objects", type=str, nargs="*", help="Only update selected objects.") subparser.add_argument( "-c", "--count", type=int, default=None, help="Update at most COUNT objects (0 to index everything)." ) diff --git a/django_opensearch_dsl/registries.py b/django_opensearch_dsl/registries.py index ca3678b..81f5c1b 100644 --- a/django_opensearch_dsl/registries.py +++ b/django_opensearch_dsl/registries.py @@ -180,5 +180,9 @@ def __contains__(self, obj): f"'in <{type(self).__name__}>' requires a Model subclass as left operand, not {type(dict).__name__}" ) + def get_indices_raw(self): + """Get all indices as they are store in the registry or the indices for a list of models.""" + return self._indices + registry = DocumentRegistry()