From ae1659e60e68f330b5e11e5105d5f9925811f9ef Mon Sep 17 00:00:00 2001 From: Amit Srivastava Date: Sun, 1 Oct 2023 23:57:39 -0700 Subject: [PATCH] [core][computes] fixed compute selection for variety of requests The main bug being fixed is the broken describe db/table call on the table browser. The compute information was incorrectly being picked up and causing even the non-compute setup to be broken. There are several different ways that compute information is passed in the api requests and within the code. This commit creates a single set of functions to handle the compute selection. Change-Id: I0cb42b8fcfefac14ce193384a61f4e87b8848031 --- apps/beeswax/src/beeswax/common.py | 81 +++++++++++++++++-- apps/beeswax/src/beeswax/models.py | 6 +- apps/beeswax/src/beeswax/server/dbms.py | 5 +- apps/metastore/src/metastore/views.py | 13 +-- desktop/libs/notebook/src/notebook/api.py | 5 +- .../notebook/src/notebook/connectors/base.py | 28 +++---- 6 files changed, 96 insertions(+), 42 deletions(-) diff --git a/apps/beeswax/src/beeswax/common.py b/apps/beeswax/src/beeswax/common.py index 7fba3b57f7..e424406aee 100644 --- a/apps/beeswax/src/beeswax/common.py +++ b/apps/beeswax/src/beeswax/common.py @@ -25,22 +25,25 @@ from django import forms +from beeswax.models import Namespace, Compute HIVE_IDENTIFER_REGEX = re.compile("(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$") -DL_FORMATS = [ 'csv', 'xls' ] +DL_FORMATS = ['csv', 'xls'] -SELECTION_SOURCE = [ '', 'table', 'constant', ] +SELECTION_SOURCE = ['', 'table', 'constant',] -AGGREGATIONS = [ '', 'COUNT', 'SUM', 'AVG', 'MIN', 'MAX' ] +AGGREGATIONS = ['', 'COUNT', 'SUM', 'AVG', 'MIN', 'MAX'] -JOIN_TYPES = [ '', 'LEFT OUTER JOIN', 'RIGHT OUTER JOIN', 'FULL OUTER JOIN', 'JOIN' ] +JOIN_TYPES = ['', 'LEFT OUTER JOIN', 'RIGHT OUTER JOIN', 'FULL OUTER JOIN', 'JOIN'] -SORT_OPTIONS = [ '', 'ascending', 'descending' ] +SORT_OPTIONS = ['', 'ascending', 'descending'] -RELATION_OPS_UNARY = [ 'IS NULL', 'IS NOT NULL', 'NOT' ] +RELATION_OPS_UNARY = ['IS NULL', 'IS NOT NULL', 'NOT'] -RELATION_OPS = [ '=', '<>', '<', '<=', '>', '>=' ] + RELATION_OPS_UNARY +RELATION_OPS = ['=', '<>', '<', '<=', '>', '>='] + RELATION_OPS_UNARY + +COMPUTE_TYPES = ['hive-compute', 'impala-compute'] TERMINATORS = [ # (hive representation, description, ascii value) @@ -67,7 +70,7 @@ def to_choices(x): Maps [a, b, c] to [(a,a), (b,b), (c,c)]. Useful for making ChoiceField's. """ - return [ (y, y) for y in x ] + return [(y, y) for y in x] def apply_natural_sort(collection, key=None): @@ -85,6 +88,68 @@ def tokenize_and_convert(item, key=None): return sorted(collection, key=lambda i: tokenize_and_convert(i, key=key)) +def is_compute(cluster): + if not cluster: + return False + connector = cluster.get('connector') + compute = cluster.get('compute') + compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES + return compute_check(cluster) or compute_check(connector) or compute_check(compute) + + +''' +find_compute attempts to find a compute based on the provided criteria. +Following is the priority order +1. A full/partial compute object available in cluster +2. Lookup namespace based on namespace_id and return the first compute + filtered by user-access. Needs valid user and namespace_id +3. Lookup namespace based on dialect from cluster or prpvided dialect + and return the first compute filtered by user-access. Needs valid user +''' +def find_compute(cluster=None, user=None, dialect=None, namespace_id=None): + if cluster: + # If we find a full/partial cluster object, we will attempt to load a compute + connector = cluster.get('connector') + compute = cluster.get('compute') + compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES + + # Pick the most probable compute object + selected_compute = (cluster if compute_check(cluster) + else compute if compute_check(compute) + else connector if compute_check(connector) else None) + + # If found, we will attempt to reload it, first by id then by name + if selected_compute: + if selected_compute.get('id'): + c = Compute.objects.filter(id=selected_compute['id']).first().to_dict() + if c: + return c + + if selected_compute.get('name'): + c = Compute.objects.filter(name=selected_compute['name']).first().to_dict() + if c: + return c + + # If we could not load by id or name, then we want to pick a default compute based on dialect + dialect = selected_compute['dialect'] if selected_compute.get('dialect') else dialect + if not dialect and cluster.get('type'): + t = cluster['type'] + dialect = 'hive' if t.startswith('hive') else 'impala' if t.startswith('impala') else None + + # We will attempt to find a default compute based on other criteria + ns = None + if namespace_id: + ns = Namespace.objects.filter(id=namespace_id).first() + + if not ns and dialect: + ns = Namespace.objects.filter(dialect=dialect).first() + + if ns and user: + computes = ns.get_computes(user) if ns else None + if computes: + return computes[0] + + class HiveIdentifierField(forms.RegexField): """ Corresponds to 'Identifier' in Hive.g (Hive's grammar) diff --git a/apps/beeswax/src/beeswax/models.py b/apps/beeswax/src/beeswax/models.py index 19dc935057..ff01425618 100644 --- a/apps/beeswax/src/beeswax/models.py +++ b/apps/beeswax/src/beeswax/models.py @@ -649,8 +649,7 @@ def to_dict(self): 'description': self.description, 'dialect': self.dialect, 'interface': self.interface, - 'external_id': self.external_id, - 'last_modified': self.last_modified + 'external_id': self.external_id } class Compute(models.Model): @@ -692,8 +691,7 @@ def to_dict(self): 'interface': self.interface, 'is_ready': self.is_ready, 'options': self.options, - 'external_id': self.external_id, - 'last_modified': self.last_modified + 'external_id': self.external_id } @property diff --git a/apps/beeswax/src/beeswax/server/dbms.py b/apps/beeswax/src/beeswax/server/dbms.py index 964c8d3c0a..a698086a2b 100644 --- a/apps/beeswax/src/beeswax/server/dbms.py +++ b/apps/beeswax/src/beeswax/server/dbms.py @@ -45,7 +45,7 @@ HIVE_DISCOVERY_HS2, HIVE_DISCOVERY_LLAP, HIVE_DISCOVERY_LLAP_HA, HIVE_DISCOVERY_LLAP_ZNODE, CACHE_TIMEOUT, \ LLAP_SERVER_HOST, LLAP_SERVER_PORT, LLAP_SERVER_THRIFT_PORT, USE_SASL as HIVE_USE_SASL, CLOSE_SESSIONS, has_session_pool, \ MAX_NUMBER_OF_SESSIONS -from beeswax.common import apply_natural_sort +from beeswax.common import apply_natural_sort, is_compute from beeswax.design import hql_query from beeswax.hive_site import hiveserver2_use_ssl, hiveserver2_impersonation_enabled, get_hiveserver2_kerberos_principal, \ hiveserver2_transport_mode, hiveserver2_thrift_http_path @@ -143,8 +143,7 @@ def get(user, query_server=None, cluster=None): def get_query_server_config(name='beeswax', connector=None): - if connector and (has_connectors() or connector.get('compute') - or connector.get('type') in ('hive-compute', 'impala-compute')): + if connector and (has_connectors() or is_compute(connector)): LOG.debug("Query via connector %s (%s)" % (name, connector.get('type'))) query_server = get_query_server_config_via_connector(connector) else: diff --git a/apps/metastore/src/metastore/views.py b/apps/metastore/src/metastore/views.py index f1761d8d34..0a591a488e 100644 --- a/apps/metastore/src/metastore/views.py +++ b/apps/metastore/src/metastore/views.py @@ -36,7 +36,8 @@ from desktop.models import Document2, get_cluster_config, _get_apps from beeswax.design import hql_query -from beeswax.models import SavedQuery, Namespace +from beeswax.common import find_compute +from beeswax.models import SavedQuery from beeswax.server import dbms from beeswax.server.dbms import get_query_server_config from desktop.lib.view_util import location_to_url @@ -782,15 +783,7 @@ def _find_cluster(request): cluster = json.loads(request.POST.get('cluster', '{}')) source_type = request.POST.get('source_type', request.GET.get('connector_id', request.GET.get('source_type', 'hive'))) namespace_id = request.GET.get('namespace') - if not cluster: - # Find the default compute - if namespace_id: - ns = Namespace.objects.filter(id=namespace_id).first() - else: - ns = Namespace.objects.filter(dialect=source_type).first() - if ns: - computes = ns.get_computes(request.user) if ns else None - cluster = computes[0] if computes else None + cluster = find_compute(cluster=cluster, user=request.user, namespace_id=namespace_id, dialect=source_type) return cluster def _get_servername(db): diff --git a/desktop/libs/notebook/src/notebook/api.py b/desktop/libs/notebook/src/notebook/api.py index b32e654249..361f098ab2 100644 --- a/desktop/libs/notebook/src/notebook/api.py +++ b/desktop/libs/notebook/src/notebook/api.py @@ -1039,9 +1039,10 @@ def describe(request, database, table=None, column=None): source_type = request.POST.get('source_type', '') cluster = json.loads(request.POST.get('cluster', '{}')) connector = cluster if cluster else json.loads(request.POST.get('connector', '{}')) + compute = json.loads(request.POST.get('compute', '{}')) - snippet = {'type': source_type, 'connector': connector} - patch_snippet_for_connector(snippet) + snippet = {'type': source_type, 'connector': connector, 'compute': compute} + patch_snippet_for_connector(snippet, user=request.user) describe = get_api(request, snippet).describe(notebook, snippet, database, table, column=column) response.update(describe) diff --git a/desktop/libs/notebook/src/notebook/connectors/base.py b/desktop/libs/notebook/src/notebook/connectors/base.py index d3d19e1572..668c8298a8 100644 --- a/desktop/libs/notebook/src/notebook/connectors/base.py +++ b/desktop/libs/notebook/src/notebook/connectors/base.py @@ -25,13 +25,12 @@ from django.utils.encoding import smart_str -from beeswax.models import Compute +from beeswax.common import find_compute, is_compute from desktop.auth.backend import is_admin from desktop.conf import TASK_SERVER, has_connectors from desktop.lib import export_csvxls from desktop.lib.exceptions_renderable import PopupException from desktop.lib.i18n import smart_unicode -from desktop.models import get_cluster_config from metadata.optimizer.base import get_api as get_optimizer_api from notebook.conf import get_ordered_interpreters @@ -402,17 +401,20 @@ def get_interpreter(connector_type, user=None): return interpreter[0] -def patch_snippet_for_connector(snippet): +def patch_snippet_for_connector(snippet, user=None): """ Connector backward compatibility switcher. # TODO Connector unification """ - if snippet['type'] == 'hive-compute' or snippet['type'] == 'impala-compute': - # No patching is needed + if is_compute(snippet): + snippet['connector'] = find_compute(cluster=snippet, user=user) + if snippet['connector'] and snippet['connector'].get('dialect'): + snippet['dialect'] = snippet['connector']['dialect'] return if snippet.get('connector') and snippet['connector'].get('type'): - if snippet['connector']['dialect'] != 'hplsql': # this is a workaround for hplsql describe not working + if snippet['connector'].get('dialect') != 'hplsql': # this is a workaround for hplsql describe not working snippet['type'] = snippet['connector']['type'] # To rename to 'id' + if snippet.get('connector') and snippet['connector'].get('dialect'): snippet['dialect'] = snippet['connector']['dialect'] else: snippet['dialect'] = snippet['type'] @@ -427,7 +429,7 @@ def get_api(request, snippet): if snippet.get('type') == 'report': snippet['type'] = 'impala' - patch_snippet_for_connector(snippet) + patch_snippet_for_connector(snippet, request.user) connector_name = snippet['type'] @@ -435,13 +437,9 @@ def get_api(request, snippet): if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user): LOG.debug('Using the interpreter from snippet') interpreter = snippet.get('interpreter') - elif get_cluster_config(request.user).get('has_computes'): - if snippet.get('type') in ('hive-compute', 'impala-compute') and snippet.get('id'): - LOG.debug("Loading the compute from db using snippet['id']: %s" % snippet['id']) - interpreter = Compute.objects.get(id=snippet['id']).to_dict() - if snippet.get('compute'): - LOG.debug("Using the compute as is from snippet['compute']") - interpreter = snippet['compute'] + elif is_compute(snippet): + LOG.debug("Finding the compute from db using snippet: %s" % snippet) + interpreter = find_compute(cluster=snippet, user=request.user) elif has_connectors() and snippet.get('connector'): LOG.debug("Connectors are enabled and picking the connector from snippet['connector']") interpreter = snippet['connector'] @@ -619,7 +617,7 @@ def statement_risk(self, interface, notebook, snippet): query = response['statement'] client = get_optimizer_api(self.user, interface) - patch_snippet_for_connector(snippet) + patch_snippet_for_connector(snippet, self.user) return client.query_risk(query=query, source_platform=snippet['dialect'], db_name=snippet.get('database') or 'default')