Skip to content

Commit

Permalink
[core][computes] fixed compute selection for variety of requests
Browse files Browse the repository at this point in the history
The main bug being fixed is the broken describe db/table call on the
table browser. The compute information was incorrectly being picked up
and causing even the non-compute setup to be broken.

There are several different ways that compute information is passed
in the api requests and within the code. This commit creates a single
set of functions to handle the compute selection.

Change-Id: I0cb42b8fcfefac14ce193384a61f4e87b8848031
  • Loading branch information
amitsrivastava committed Oct 3, 2023
1 parent 3faef3b commit ae1659e
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 42 deletions.
81 changes: 73 additions & 8 deletions apps/beeswax/src/beeswax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,25 @@

from django import forms

from beeswax.models import Namespace, Compute

HIVE_IDENTIFER_REGEX = re.compile("(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")

DL_FORMATS = [ 'csv', 'xls' ]
DL_FORMATS = ['csv', 'xls']

SELECTION_SOURCE = [ '', 'table', 'constant', ]
SELECTION_SOURCE = ['', 'table', 'constant',]

AGGREGATIONS = [ '', 'COUNT', 'SUM', 'AVG', 'MIN', 'MAX' ]
AGGREGATIONS = ['', 'COUNT', 'SUM', 'AVG', 'MIN', 'MAX']

JOIN_TYPES = [ '', 'LEFT OUTER JOIN', 'RIGHT OUTER JOIN', 'FULL OUTER JOIN', 'JOIN' ]
JOIN_TYPES = ['', 'LEFT OUTER JOIN', 'RIGHT OUTER JOIN', 'FULL OUTER JOIN', 'JOIN']

SORT_OPTIONS = [ '', 'ascending', 'descending' ]
SORT_OPTIONS = ['', 'ascending', 'descending']

RELATION_OPS_UNARY = [ 'IS NULL', 'IS NOT NULL', 'NOT' ]
RELATION_OPS_UNARY = ['IS NULL', 'IS NOT NULL', 'NOT']

RELATION_OPS = [ '=', '<>', '<', '<=', '>', '>=' ] + RELATION_OPS_UNARY
RELATION_OPS = ['=', '<>', '<', '<=', '>', '>='] + RELATION_OPS_UNARY

COMPUTE_TYPES = ['hive-compute', 'impala-compute']

TERMINATORS = [
# (hive representation, description, ascii value)
Expand All @@ -67,7 +70,7 @@ def to_choices(x):
Maps [a, b, c] to [(a,a), (b,b), (c,c)].
Useful for making ChoiceField's.
"""
return [ (y, y) for y in x ]
return [(y, y) for y in x]


def apply_natural_sort(collection, key=None):
Expand All @@ -85,6 +88,68 @@ def tokenize_and_convert(item, key=None):
return sorted(collection, key=lambda i: tokenize_and_convert(i, key=key))


def is_compute(cluster):
if not cluster:
return False
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES
return compute_check(cluster) or compute_check(connector) or compute_check(compute)


'''
find_compute attempts to find a compute based on the provided criteria.
Following is the priority order
1. A full/partial compute object available in cluster
2. Lookup namespace based on namespace_id and return the first compute
filtered by user-access. Needs valid user and namespace_id
3. Lookup namespace based on dialect from cluster or prpvided dialect
and return the first compute filtered by user-access. Needs valid user
'''
def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
if cluster:
# If we find a full/partial cluster object, we will attempt to load a compute
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES

# Pick the most probable compute object
selected_compute = (cluster if compute_check(cluster)
else compute if compute_check(compute)
else connector if compute_check(connector) else None)

# If found, we will attempt to reload it, first by id then by name
if selected_compute:
if selected_compute.get('id'):
c = Compute.objects.filter(id=selected_compute['id']).first().to_dict()
if c:
return c

if selected_compute.get('name'):
c = Compute.objects.filter(name=selected_compute['name']).first().to_dict()
if c:
return c

# If we could not load by id or name, then we want to pick a default compute based on dialect
dialect = selected_compute['dialect'] if selected_compute.get('dialect') else dialect
if not dialect and cluster.get('type'):
t = cluster['type']
dialect = 'hive' if t.startswith('hive') else 'impala' if t.startswith('impala') else None

# We will attempt to find a default compute based on other criteria
ns = None
if namespace_id:
ns = Namespace.objects.filter(id=namespace_id).first()

if not ns and dialect:
ns = Namespace.objects.filter(dialect=dialect).first()

if ns and user:
computes = ns.get_computes(user) if ns else None
if computes:
return computes[0]


class HiveIdentifierField(forms.RegexField):
"""
Corresponds to 'Identifier' in Hive.g (Hive's grammar)
Expand Down
6 changes: 2 additions & 4 deletions apps/beeswax/src/beeswax/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,7 @@ def to_dict(self):
'description': self.description,
'dialect': self.dialect,
'interface': self.interface,
'external_id': self.external_id,
'last_modified': self.last_modified
'external_id': self.external_id
}

class Compute(models.Model):
Expand Down Expand Up @@ -692,8 +691,7 @@ def to_dict(self):
'interface': self.interface,
'is_ready': self.is_ready,
'options': self.options,
'external_id': self.external_id,
'last_modified': self.last_modified
'external_id': self.external_id
}

@property
Expand Down
5 changes: 2 additions & 3 deletions apps/beeswax/src/beeswax/server/dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
HIVE_DISCOVERY_HS2, HIVE_DISCOVERY_LLAP, HIVE_DISCOVERY_LLAP_HA, HIVE_DISCOVERY_LLAP_ZNODE, CACHE_TIMEOUT, \
LLAP_SERVER_HOST, LLAP_SERVER_PORT, LLAP_SERVER_THRIFT_PORT, USE_SASL as HIVE_USE_SASL, CLOSE_SESSIONS, has_session_pool, \
MAX_NUMBER_OF_SESSIONS
from beeswax.common import apply_natural_sort
from beeswax.common import apply_natural_sort, is_compute
from beeswax.design import hql_query
from beeswax.hive_site import hiveserver2_use_ssl, hiveserver2_impersonation_enabled, get_hiveserver2_kerberos_principal, \
hiveserver2_transport_mode, hiveserver2_thrift_http_path
Expand Down Expand Up @@ -143,8 +143,7 @@ def get(user, query_server=None, cluster=None):


def get_query_server_config(name='beeswax', connector=None):
if connector and (has_connectors() or connector.get('compute')
or connector.get('type') in ('hive-compute', 'impala-compute')):
if connector and (has_connectors() or is_compute(connector)):
LOG.debug("Query via connector %s (%s)" % (name, connector.get('type')))
query_server = get_query_server_config_via_connector(connector)
else:
Expand Down
13 changes: 3 additions & 10 deletions apps/metastore/src/metastore/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
from desktop.models import Document2, get_cluster_config, _get_apps

from beeswax.design import hql_query
from beeswax.models import SavedQuery, Namespace
from beeswax.common import find_compute
from beeswax.models import SavedQuery
from beeswax.server import dbms
from beeswax.server.dbms import get_query_server_config
from desktop.lib.view_util import location_to_url
Expand Down Expand Up @@ -782,15 +783,7 @@ def _find_cluster(request):
cluster = json.loads(request.POST.get('cluster', '{}'))
source_type = request.POST.get('source_type', request.GET.get('connector_id', request.GET.get('source_type', 'hive')))
namespace_id = request.GET.get('namespace')
if not cluster:
# Find the default compute
if namespace_id:
ns = Namespace.objects.filter(id=namespace_id).first()
else:
ns = Namespace.objects.filter(dialect=source_type).first()
if ns:
computes = ns.get_computes(request.user) if ns else None
cluster = computes[0] if computes else None
cluster = find_compute(cluster=cluster, user=request.user, namespace_id=namespace_id, dialect=source_type)
return cluster

def _get_servername(db):
Expand Down
5 changes: 3 additions & 2 deletions desktop/libs/notebook/src/notebook/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,9 +1039,10 @@ def describe(request, database, table=None, column=None):
source_type = request.POST.get('source_type', '')
cluster = json.loads(request.POST.get('cluster', '{}'))
connector = cluster if cluster else json.loads(request.POST.get('connector', '{}'))
compute = json.loads(request.POST.get('compute', '{}'))

snippet = {'type': source_type, 'connector': connector}
patch_snippet_for_connector(snippet)
snippet = {'type': source_type, 'connector': connector, 'compute': compute}
patch_snippet_for_connector(snippet, user=request.user)

describe = get_api(request, snippet).describe(notebook, snippet, database, table, column=column)
response.update(describe)
Expand Down
28 changes: 13 additions & 15 deletions desktop/libs/notebook/src/notebook/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@

from django.utils.encoding import smart_str

from beeswax.models import Compute
from beeswax.common import find_compute, is_compute
from desktop.auth.backend import is_admin
from desktop.conf import TASK_SERVER, has_connectors
from desktop.lib import export_csvxls
from desktop.lib.exceptions_renderable import PopupException
from desktop.lib.i18n import smart_unicode
from desktop.models import get_cluster_config
from metadata.optimizer.base import get_api as get_optimizer_api

from notebook.conf import get_ordered_interpreters
Expand Down Expand Up @@ -402,17 +401,20 @@ def get_interpreter(connector_type, user=None):
return interpreter[0]


def patch_snippet_for_connector(snippet):
def patch_snippet_for_connector(snippet, user=None):
"""
Connector backward compatibility switcher.
# TODO Connector unification
"""
if snippet['type'] == 'hive-compute' or snippet['type'] == 'impala-compute':
# No patching is needed
if is_compute(snippet):
snippet['connector'] = find_compute(cluster=snippet, user=user)
if snippet['connector'] and snippet['connector'].get('dialect'):
snippet['dialect'] = snippet['connector']['dialect']
return
if snippet.get('connector') and snippet['connector'].get('type'):
if snippet['connector']['dialect'] != 'hplsql': # this is a workaround for hplsql describe not working
if snippet['connector'].get('dialect') != 'hplsql': # this is a workaround for hplsql describe not working
snippet['type'] = snippet['connector']['type'] # To rename to 'id'
if snippet.get('connector') and snippet['connector'].get('dialect'):
snippet['dialect'] = snippet['connector']['dialect']
else:
snippet['dialect'] = snippet['type']
Expand All @@ -427,21 +429,17 @@ def get_api(request, snippet):
if snippet.get('type') == 'report':
snippet['type'] = 'impala'

patch_snippet_for_connector(snippet)
patch_snippet_for_connector(snippet, request.user)

connector_name = snippet['type']

interpreter = None
if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user):
LOG.debug('Using the interpreter from snippet')
interpreter = snippet.get('interpreter')
elif get_cluster_config(request.user).get('has_computes'):
if snippet.get('type') in ('hive-compute', 'impala-compute') and snippet.get('id'):
LOG.debug("Loading the compute from db using snippet['id']: %s" % snippet['id'])
interpreter = Compute.objects.get(id=snippet['id']).to_dict()
if snippet.get('compute'):
LOG.debug("Using the compute as is from snippet['compute']")
interpreter = snippet['compute']
elif is_compute(snippet):
LOG.debug("Finding the compute from db using snippet: %s" % snippet)
interpreter = find_compute(cluster=snippet, user=request.user)
elif has_connectors() and snippet.get('connector'):
LOG.debug("Connectors are enabled and picking the connector from snippet['connector']")
interpreter = snippet['connector']
Expand Down Expand Up @@ -619,7 +617,7 @@ def statement_risk(self, interface, notebook, snippet):
query = response['statement']

client = get_optimizer_api(self.user, interface)
patch_snippet_for_connector(snippet)
patch_snippet_for_connector(snippet, self.user)

return client.query_risk(query=query, source_platform=snippet['dialect'], db_name=snippet.get('database') or 'default')

Expand Down

0 comments on commit ae1659e

Please sign in to comment.