Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Tinybird query runner #5616

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Redash supports more than 35 SQL and NoSQL [data sources](https://redash.io/help
- SPARQL
- SQLite
- TiDB
- Tinybird
- TreasureData
- Trino
- Uptycs
Expand Down
Binary file added client/app/assets/images/db-logos/tinybird.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 1 addition & 5 deletions redash/query_runner/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ def configuration_schema(cls):
"secret": ["password"],
}

@classmethod
def type(cls):
return "clickhouse"

@property
def _url(self):
return urlparse(self.configuration["url"])
Expand Down Expand Up @@ -162,7 +158,7 @@ def _define_column_type(column):
return TYPE_STRING

def _clickhouse_query(self, query, session_id=None, session_check=None):
logger.debug("Clickhouse is about to execute query: %s", query)
logger.debug(f"{self.name()} is about to execute query: %s", query)

query += "\nFORMAT JSON"

Expand Down
113 changes: 113 additions & 0 deletions redash/query_runner/tinybird.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import logging

import requests

from redash.query_runner import register
from redash.query_runner.clickhouse import ClickHouse

logger = logging.getLogger(__name__)


class Tinybird(ClickHouse):
guidopetri marked this conversation as resolved.
Show resolved Hide resolved
noop_query = "SELECT count() FROM tinybird.pipe_stats LIMIT 1"

DEFAULT_URL = "https://api.tinybird.co"

SQL_ENDPOINT = "/v0/sql"
DATASOURCES_ENDPOINT = "/v0/datasources"
PIPES_ENDPOINT = "/v0/pipes"

@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"url": {"type": "string", "default": cls.DEFAULT_URL},
"token": {"type": "string", "title": "Auth Token"},
"timeout": {
"type": "number",
"title": "Request Timeout",
"default": 30,
},
"verify": {
"type": "boolean",
"title": "Verify SSL certificate",
"default": True,
},
},
"order": ["url", "token"],
"required": ["token"],
"extra_options": ["timeout", "verify"],
"secret": ["token"],
}

def _get_tables(self, schema):
self._collect_tinybird_schema(
schema,
self.DATASOURCES_ENDPOINT,
"datasources",
)

self._collect_tinybird_schema(
schema,
self.PIPES_ENDPOINT,
"pipes",
)

return list(schema.values())

def _send_query(self, data, session_id=None, session_check=None):
return self._get_from_tinybird(
self.SQL_ENDPOINT,
params={"q": data.encode("utf-8", "ignore")},
)

def _collect_tinybird_schema(self, schema, endpoint, resource_type):
response = self._get_from_tinybird(endpoint)
resources = response.get(resource_type, [])

for r in resources:
if r["name"] not in schema:
schema[r["name"]] = {"name": r["name"], "columns": []}

if resource_type == "pipes" and not r.get("endpoint"):
continue

Check warning on line 74 in redash/query_runner/tinybird.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/tinybird.py#L74

Added line #L74 was not covered by tests

query = f"SELECT * FROM {r['name']} LIMIT 1 FORMAT JSON"
try:
query_result = self._send_query(query)
except Exception:
logger.exception(f"error in schema {r['name']}")
continue

Check warning on line 81 in redash/query_runner/tinybird.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/tinybird.py#L79-L81

Added lines #L79 - L81 were not covered by tests

columns = [meta["name"] for meta in query_result["meta"]]
schema[r["name"]]["columns"].extend(columns)

return schema

def _get_from_tinybird(self, endpoint, params=None):
url = f"{self.configuration.get('url', self.DEFAULT_URL)}{endpoint}"
authorization = f"Bearer {self.configuration.get('token')}"

try:
response = requests.get(
url,
timeout=self.configuration.get("timeout", 30),
params=params,
headers={"Authorization": authorization},
verify=self.configuration.get("verify", True),
)
except requests.RequestException as e:

Check warning on line 100 in redash/query_runner/tinybird.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/tinybird.py#L100

Added line #L100 was not covered by tests
if e.response:
details = f"({e.__class__.__name__}, Status Code: {e.response.status_code})"

Check warning on line 102 in redash/query_runner/tinybird.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/tinybird.py#L102

Added line #L102 was not covered by tests
else:
details = f"({e.__class__.__name__})"
raise Exception(f"Connection error to: {url} {details}.")

Check warning on line 105 in redash/query_runner/tinybird.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/tinybird.py#L104-L105

Added lines #L104 - L105 were not covered by tests

if response.status_code >= 400:
raise Exception(response.text)

Check warning on line 108 in redash/query_runner/tinybird.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/tinybird.py#L108

Added line #L108 was not covered by tests

return response.json()


register(Tinybird)
1 change: 1 addition & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def email_server_is_configured():
"redash.query_runner.impala_ds",
"redash.query_runner.vertica",
"redash.query_runner.clickhouse",
"redash.query_runner.tinybird",
"redash.query_runner.yandex_metrica",
"redash.query_runner.rockset",
"redash.query_runner.treasuredata",
Expand Down
123 changes: 123 additions & 0 deletions tests/query_runner/test_tinybird.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import json
from unittest import TestCase
from unittest.mock import Mock, patch

from redash.query_runner import TYPE_DATETIME, TYPE_INTEGER, TYPE_STRING
from redash.query_runner.tinybird import Tinybird

DATASOURCES_RESPONSE = {
"datasources": [
{
"id": "t_datasource_id",
"name": "test_datasource",
"columns": [
{
"name": "string_attribute",
"type": "String",
"nullable": False,
"normalized_name": "string_attribute",
},
{
"name": "number_attribute",
"type": "Int32",
"nullable": False,
"normalized_name": "number_attribute",
},
{"name": "date_attribute", "type": "DateTime", "nullable": False, "normalized_name": "date_attribute"},
],
}
]
}

PIPES_RESPONSE = {"pipes": [{"id": "t_pipe_id", "name": "test_pipe", "endpoint": "t_endpoint_id", "type": "endpoint"}]}

SCHEMA_RESPONSE = {
"meta": [
{"name": "string_attribute", "type": "String"},
{"name": "number_attribute", "type": "Int32"},
{"name": "date_attribute", "type": "DateTime"},
]
}

QUERY_RESPONSE = {
**SCHEMA_RESPONSE,
"data": [
{"string_attribute": "hello world", "number_attribute": 123, "date_attribute": "2023-01-01 00:00:03.001000"},
],
"rows": 1,
"statistics": {"elapsed": 0.011556914, "rows_read": 87919, "bytes_read": 17397219},
}


class TestTinybird(TestCase):
@patch("requests.get")
def test_get_schema_scans_pipes_and_datasources(self, get_request):
query_runner = self._build_query_runner()

get_request.side_effect = self._mock_tinybird_schema_requests

schema = query_runner.get_schema()

self.assertEqual(
schema,
[
{"name": "test_datasource", "columns": ["string_attribute", "number_attribute", "date_attribute"]},
{"name": "test_pipe", "columns": ["string_attribute", "number_attribute", "date_attribute"]},
],
)

(url,), kwargs = get_request.call_args

self.assertEqual(kwargs["timeout"], 60)
self.assertEqual(kwargs["headers"], {"Authorization": "Bearer p.test.token"})

@patch("requests.get")
def test_run_query(self, get_request):
query_runner = self._build_query_runner()

get_request.return_value = Mock(
status_code=200, text=json.dumps(QUERY_RESPONSE), json=Mock(return_value=QUERY_RESPONSE)
)

data, error = query_runner.run_query("SELECT * FROM test_datasource LIMIT 1", None)

self.assertIsNone(error)
self.assertEqual(
json.loads(data),
{
"columns": [
{"name": "string_attribute", "friendly_name": "string_attribute", "type": TYPE_STRING},
{"name": "number_attribute", "friendly_name": "number_attribute", "type": TYPE_INTEGER},
{"name": "date_attribute", "friendly_name": "date_attribute", "type": TYPE_DATETIME},
],
"rows": [
{
"string_attribute": "hello world",
"number_attribute": 123,
"date_attribute": "2023-01-01 00:00:03.001000",
}
],
},
)

(url,), kwargs = get_request.call_args

self.assertEqual(url, "https://api.tinybird.co/v0/sql")
self.assertEqual(kwargs["timeout"], 60)
self.assertEqual(kwargs["headers"], {"Authorization": "Bearer p.test.token"})
self.assertEqual(kwargs["params"], {"q": b"SELECT * FROM test_datasource LIMIT 1\nFORMAT JSON"})

def _mock_tinybird_schema_requests(self, endpoint, **kwargs):
response = {}

if endpoint.endswith(Tinybird.PIPES_ENDPOINT):
response = PIPES_RESPONSE
if endpoint.endswith(Tinybird.DATASOURCES_ENDPOINT):
response = DATASOURCES_RESPONSE
if endpoint.endswith(Tinybird.SQL_ENDPOINT):
response = SCHEMA_RESPONSE

return Mock(status_code=200, text=json.dumps(response), json=Mock(return_value=response))

def _build_query_runner(self):
return Tinybird({"url": "https://api.tinybird.co", "token": "p.test.token", "timeout": 60})
Loading