Skip to content

Commit

Permalink
Merge branch 'master' into fix-query-schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
arikfr authored Oct 27, 2024
2 parents 49fa2b1 + ba973eb commit 271de55
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 98 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/restyled.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Restyled

on:
pull_request:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
restyled:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}

- uses: restyled-io/actions/setup@v4
- id: restyler
uses: restyled-io/actions/run@v4
with:
fail-on-differences: true

- if: |
!cancelled() &&
steps.restyler.outputs.success == 'true' &&
github.event.pull_request.head.repo.full_name == github.repository
uses: peter-evans/create-pull-request@v6
with:
base: ${{ steps.restyler.outputs.restyled-base }}
branch: ${{ steps.restyler.outputs.restyled-head }}
title: ${{ steps.restyler.outputs.restyled-title }}
body: ${{ steps.restyler.outputs.restyled-body }}
labels: "restyled"
reviewers: ${{ github.event.pull_request.user.login }}
delete-branch: true
64 changes: 64 additions & 0 deletions migrations/versions/9e8c841d1a30_fix_hash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""fix_hash
Revision ID: 9e8c841d1a30
Revises: 7205816877ec
Create Date: 2024-10-05 18:55:35.730573
"""
import logging
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table
from sqlalchemy import select

from redash.query_runner import BaseQueryRunner, get_query_runner


# revision identifiers, used by Alembic.
revision = '9e8c841d1a30'
down_revision = '7205816877ec'
branch_labels = None
depends_on = None


def update_query_hash(record):
should_apply_auto_limit = record['options'].get("apply_auto_limit", False) if record['options'] else False
query_runner = get_query_runner(record['type'], {}) if record['type'] else BaseQueryRunner({})
query_text = record['query']

parameters_dict = {p["name"]: p.get("value") for p in record['options'].get('parameters', [])} if record.options else {}
if any(parameters_dict):
print(f"Query {record['query_id']} has parameters. Hash might be incorrect.")

return query_runner.gen_query_hash(query_text, should_apply_auto_limit)


def upgrade():
conn = op.get_bind()

metadata = sa.MetaData(bind=conn)
queries = sa.Table("queries", metadata, autoload=True)
data_sources = sa.Table("data_sources", metadata, autoload=True)

joined_table = queries.outerjoin(data_sources, queries.c.data_source_id == data_sources.c.id)

query = select([
queries.c.id.label("query_id"),
queries.c.query,
queries.c.query_hash,
queries.c.options,
data_sources.c.id.label("data_source_id"),
data_sources.c.type
]).select_from(joined_table)

for record in conn.execute(query):
new_hash = update_query_hash(record)
print(f"Updating hash for query {record['query_id']} from {record['query_hash']} to {new_hash}")
conn.execute(
queries.update()
.where(queries.c.id == record['query_id'])
.values(query_hash=new_hash))


def downgrade():
pass
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "redash-client",
"version": "24.09.0-dev",
"version": "24.10.0-dev",
"description": "The frontend part of Redash.",
"main": "index.js",
"scripts": {
Expand Down
129 changes: 62 additions & 67 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ force-exclude = '''

[tool.poetry]
name = "redash"
version = "24.09.0-dev"
version = "24.10.0-dev"
description = "Make Your Company Data Driven. Connect to any data source, easily visualize, dashboard and share your data."
authors = ["Arik Fraimovich <[email protected]>"]
# to be added to/removed from the mailing list, please reach out to Arik via the above email or Discord
Expand All @@ -29,7 +29,7 @@ authlib = "0.15.5"
backoff = "2.2.1"
blinker = "1.6.2"
click = "8.1.3"
cryptography = "42.0.8"
cryptography = "43.0.1"
disposable-email-domains = ">=0.0.52"
flask = "2.3.2"
flask-limiter = "3.3.1"
Expand Down Expand Up @@ -65,7 +65,7 @@ pyyaml = "6.0.1"
redis = "4.6.0"
regex = "2023.8.8"
requests = "2.32.3"
restrictedpython = "6.2"
restrictedpython = "7.3"
rq = "1.16.1"
rq-scheduler = "0.13.1"
semver = "2.8.1"
Expand Down Expand Up @@ -130,7 +130,7 @@ python-rapidjson = "1.20"
requests-aws-sign = "0.1.5"
sasl = ">=0.1.3"
simple-salesforce = "0.74.3"
snowflake-connector-python = "3.12.0"
snowflake-connector-python = "3.12.3"
td-client = "1.0.0"
thrift = ">=0.8.0"
thrift-sasl = ">=0.1.0"
Expand Down
2 changes: 1 addition & 1 deletion redash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from redash.destinations import import_destinations
from redash.query_runner import import_query_runners

__version__ = "24.09.0-dev"
__version__ = "24.10.0-dev"


if os.environ.get("REMOTE_DEBUG"):
Expand Down
16 changes: 16 additions & 0 deletions redash/cli/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@
manager = AppGroup(help="Queries management commands.")


@manager.command(name="rehash")
def rehash():
from redash import models

for q in models.Query.query.all():
old_hash = q.query_hash
q.update_query_hash()
new_hash = q.query_hash

if old_hash != new_hash:
print(f"Query {q.id} has changed hash from {old_hash} to {new_hash}")
models.db.session.add(q)

models.db.session.commit()


@manager.command(name="add_tag")
@argument("query_id")
@argument("tag")
Expand Down
4 changes: 2 additions & 2 deletions redash/metrics/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy.engine import Engine
from sqlalchemy.event import listens_for
from sqlalchemy.orm.util import _ORMJoin
from sqlalchemy.sql.selectable import Alias
from sqlalchemy.sql.selectable import Alias, Join

from redash import statsd_client

Expand All @@ -18,7 +18,7 @@ def _table_name_from_select_element(elt):
if isinstance(t, Alias):
t = t.original.froms[0]

while isinstance(t, _ORMJoin):
while isinstance(t, _ORMJoin) or isinstance(t, Join):
t = t.left

return t.name
Expand Down
2 changes: 1 addition & 1 deletion redash/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_status():


def rq_job_ids():
queues = Queue.all(connection=redis_connection)
queues = Queue.all(connection=rq_redis_connection)

started_jobs = [StartedJobRegistry(queue=q).get_job_ids() for q in queues]
queued_jobs = [q.job_ids for q in queues]
Expand Down
55 changes: 36 additions & 19 deletions redash/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from rq import Queue as BaseQueue
from rq.job import Job as BaseJob
from rq.job import JobStatus
from rq.timeouts import HorseMonitorTimeoutException, UnixSignalDeathPenalty
from rq.timeouts import HorseMonitorTimeoutException
from rq.utils import utcnow
from rq.worker import (
HerokuWorker, # HerokuWorker implements graceful shutdown on SIGTERM
Expand Down Expand Up @@ -113,30 +113,44 @@ def enforce_hard_limit(self, job):
)
self.kill_horse()

def monitor_work_horse(self, job, queue):
def monitor_work_horse(self, job: "Job", queue: "Queue"):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed
Args:
job (Job): _description_
queue (Queue): _description_
"""
self.monitor_started = utcnow()
retpid = ret_val = rusage = None
job.started_at = utcnow()
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val = os.waitpid(self._horse_pid, 0)
with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val, rusage = self.wait_for_horse()
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5)
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())

job.refresh()
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
break

self.maintain_heartbeats(job)

if job.is_cancelled:
self.stop_executing_job(job)

if self.soft_limit_exceeded(job):
self.enforce_hard_limit(job)

except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
Expand All @@ -149,29 +163,32 @@ def monitor_work_horse(self, job, queue):
# Send a heartbeat to keep the worker alive.
self.heartbeat()

self.set_current_job_working_time(0)
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally.
return

job_status = job.get_status()

if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
elif self._stopped_job_id == job.id:
# Work-horse killed deliberately
self.log.warning("Job stopped by user, moving job to FailedJobRegistry")
if job.stopped_callback:
job.execute_stopped_callback(self.death_penalty_class)
self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()

# Unhandled failure: move the job to the failed queue
self.log.warning(
(
"Moving job to FailedJobRegistry "
"(work-horse terminated unexpectedly; waitpid returned {})" # fmt: skip
).format(ret_val)
)

self.handle_job_failure(
job,
queue=queue,
exc_string="Work-horse process was terminated unexpectedly "
"(waitpid returned %s)" % ret_val, # fmt: skip
)
signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ""
exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; "
self.log.warning("Moving job to FailedJobRegistry (%s)", exc_string)

self.handle_work_horse_killed(job, retpid, ret_val, rusage)
self.handle_job_failure(job, queue=queue, exc_string=exc_string)


class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):
Expand Down
23 changes: 23 additions & 0 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from unittest.mock import MagicMock, patch

from redash import rq_redis_connection
from redash.monitor import rq_job_ids


def test_rq_job_ids_uses_rq_redis_connection():
mock_queue = MagicMock()
mock_queue.job_ids = []

mock_registry = MagicMock()
mock_registry.get_job_ids.return_value = []

with patch("redash.monitor.Queue") as mock_Queue, patch(
"redash.monitor.StartedJobRegistry"
) as mock_StartedJobRegistry:
mock_Queue.all.return_value = [mock_queue]
mock_StartedJobRegistry.return_value = mock_registry

rq_job_ids()

mock_Queue.all.assert_called_once_with(connection=rq_redis_connection)
mock_StartedJobRegistry.assert_called_once_with(queue=mock_queue)
6 changes: 3 additions & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8137,9 +8137,9 @@ http-parser-js@>=0.5.1:
integrity sha512-SGeBX54F94Wgu5RH3X5jsDtf4eHyRogWX1XGT3b4HuW3tQPM4AaBzoUji/4AAJNXCEOWZ5O0DgZmJw1947gD5Q==

http-proxy-middleware@^2.0.3:
version "2.0.6"
resolved "https://registry.yarnpkg.com/http-proxy-middleware/-/http-proxy-middleware-2.0.6.tgz#e1a4dd6979572c7ab5a4e4b55095d1f32a74963f"
integrity sha512-ya/UeJ6HVBYxrgYotAZo1KvPWlgB48kUJLDePFeneHsVujFaW5WNj2NgWCAE//B1Dl02BIfYlpNgBy8Kf8Rjmw==
version "2.0.7"
resolved "https://registry.yarnpkg.com/http-proxy-middleware/-/http-proxy-middleware-2.0.7.tgz#915f236d92ae98ef48278a95dedf17e991936ec6"
integrity sha512-fgVY8AV7qU7z/MmXJ/rxwbrtQH4jBQ9m7kp3llF0liB7glmFeVZFBepQb32T3y8n8k2+AEYuMPCpinYW+/CuRA==
dependencies:
"@types/http-proxy" "^1.17.8"
http-proxy "^1.18.1"
Expand Down

0 comments on commit 271de55

Please sign in to comment.