Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add get_indexes #18

Merged
merged 2 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion sqlalchemy_risingwave/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,38 @@ def get_columns(self, conn, table_name, schema=None, **kw):
return res

def get_indexes(self, conn, table_name, schema=None, **kw):
return []
table_oid = self.get_table_oid(
conn, table_name, schema, info_cache=kw.get("info_cache")
)

sql = (
"select i.relname, a.attname from pg_catalog.pg_class t "
"join pg_catalog.pg_index ix on t.oid = ix.indrelid "
"join pg_catalog.pg_class i on i.oid = ix.indexrelid "
"join pg_catalog.pg_attribute a on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)"
"where t.oid = :table_oid"
)
rows = conn.execute(
text(sql),
{"table_oid": table_oid},
)

indexes = {}
for row in rows:
if not row.relname in indexes:
indexes[row.relname] = []
indexes[row.relname].append(row.attname)

res = []
for index in indexes:
res.append(
{
"name": index,
"column_names": indexes[index],
"unique": False,
}
)
return res

def get_foreign_keys_v1(self, conn, table_name, schema=None, **kw):
raise []
Expand Down
26 changes: 16 additions & 10 deletions test/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,11 @@ class SchemaTest(fixtures.TestBase):

def teardown_method(self, method):
with testing.db.begin() as conn:
conn.execute(text("DROP TABLE IF EXISTS users"))
conn.execute("DROP TABLE IF EXISTS users")

def setup_method(self):
with testing.db.begin() as conn:
conn.execute(
text(
"""
CREATE TABLE users (
name STRING PRIMARY KEY
)
"""
)
)
conn.execute("CREATE TABLE users (name STRING PRIMARY KEY)")
self.meta = MetaData(schema="public")

def test_get_columns_indexes_across_schema(self):
Expand All @@ -34,3 +26,17 @@ def test_returning_clause(self):

for t in table_names:
assert t == str("users")

def test_get_indexes(self):
with testing.db.begin() as conn:
conn.execute("CREATE TABLE three_columns (id1 INT, id2 INT, id3 INT)")
conn.execute("CREATE INDEX three_columns_idx ON three_columns(id2) INCLUDE(id1)")

insp = inspect(testing.db)
indexes = insp.get_indexes("three_columns")

assert len(indexes) == 1
assert indexes[0]["name"] == "three_columns_idx"
assert indexes[0]["column_names"] == ["id2", "id1"]
Comment on lines +39 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get pk of this index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can't. We have the ability to add and use indnkeyatts column in pg_index. The value of indnkeyatts means that how many keys in front of indkey are pk. We can support getting pk when we need it.


conn.execute("DROP TABLE three_columns")