Skip to content

Commit

Permalink
feat: pass api_access_token to executor client
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangvi7 committed Jul 3, 2024
1 parent 208f6d0 commit 9dc92ae
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 8 deletions.
9 changes: 7 additions & 2 deletions querybook/server/app/auth/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ class AuthenticationError(Exception):


class AuthUser(UserMixin):
def __init__(self, user: User):
def __init__(self, user: User, api_access_token=False):
self._user_dict = user.to_dict(with_roles=True)
self._api_access_token = api_access_token

@property
def api_access_token(self):
return self._api_access_token

@property
def id(self):
Expand Down Expand Up @@ -72,7 +77,7 @@ def load_user_with_api_access_token(request):
if token_validation:
if token_validation.enabled:
user = get_user_by_id(token_validation.creator_uid, session=session)
return AuthUser(user)
return AuthUser(user, api_access_token=True)
else:
flask.abort(
UNAUTHORIZED_STATUS_CODE, description="Token is disabled."
Expand Down
7 changes: 4 additions & 3 deletions querybook/server/datasources/query_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ def create_query_execution(

try:
run_query_task.apply_async(
args=[
query_execution.id,
]
kwargs={
"query_execution_id": query_execution.id,
"api_access_token": current_user.api_access_token,
},
)
query_execution_dict = query_execution.to_dict()

Expand Down
2 changes: 2 additions & 0 deletions querybook/server/lib/query_executor/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,12 @@ def __init__(
statement_ranges,
client_setting,
execution_type,
api_access_token=False,
):
self._query = query
self._query_execution_id = query_execution_id
self._execution_type = execution_type
self._api_access_token = api_access_token

if self.SINGLE_QUERY_QUERY_ENGINE():
self._statement_ranges = [[0, len(query)]]
Expand Down
8 changes: 7 additions & 1 deletion querybook/server/lib/query_executor/executor_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@

@with_session
def create_executor_from_execution(
query_execution_id, celery_task, execution_type, session=None
query_execution_id,
celery_task,
execution_type,
api_access_token=False,
session=None,
):
executor_params, engine = _get_executor_params_and_engine(
query_execution_id,
celery_task=celery_task,
execution_type=execution_type,
session=session,
)
executor_params["api_access_token"] = api_access_token

executor = get_executor_class(engine.language, engine.executor)(**executor_params)
return executor

Expand Down
10 changes: 8 additions & 2 deletions querybook/server/tasks/run_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
acks_late=True,
)
def run_query_task(
self, query_execution_id, execution_type=QueryExecutionType.ADHOC.value
self,
query_execution_id,
execution_type=QueryExecutionType.ADHOC.value,
api_access_token=False,
):
stats_logger.incr(QUERY_EXECUTIONS, tags={"execution_type": execution_type})

Expand All @@ -39,7 +42,10 @@ def run_query_task(

try:
executor = create_executor_from_execution(
query_execution_id, celery_task=self, execution_type=execution_type
query_execution_id,
celery_task=self,
execution_type=execution_type,
api_access_token=api_access_token,
)
run_executor_until_finish(self, executor)
except SoftTimeLimitExceeded:
Expand Down

0 comments on commit 9dc92ae

Please sign in to comment.