-
Notifications
You must be signed in to change notification settings - Fork 6
/
sql2meili.py
290 lines (255 loc) · 9.68 KB
/
sql2meili.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
"""
This module provides a way to export SQL tables to a Meilisearch
instance in an easy and fast way.
"""
import argparse
import json
import logging
from math import ceil
from os.path import isfile
from sys import exit as sys_exit
from typing import List, Dict
import jsonschema
import meilisearch
from jsonschema import validate
from sqlalchemy import engine, create_engine, MetaData, func, select
Json = Dict
class DatabaseConn:
"""
Class that holds all the data and
relevant methods pertaining to the
database
"""
database_url: str
tables: List[str]
database_engine: engine
metadata: MetaData
def __init__(
self,
adapter: str,
host: str,
port: int,
username: str,
password: str,
database_name: str,
tables: List[str],
) -> None:
self.tables = tables
self.database_url = (
f"{adapter}://{username}:{password}@{host}:{port}/{database_name}")
def connect_to_db(self) -> None:
"""
Creates an engine that's used to connect to the database
and introspects the database
"""
logging.info("Connecting to SQL database...")
self.database_engine = create_engine(self.database_url)
self.metadata = MetaData()
self.metadata.reflect(bind=self.database_engine)
logging.info("Connected to database at %s",
self.database_engine.engine)
def validate_tables(self) -> None:
"""Checks if all the tables defined in the config file
exist in the database
"""
tables_not_present: List = list(
filter(lambda tab: tab not in self.metadata.tables, self.tables))
if len(tables_not_present) > 0:
raise KeyError(
f"The following tables do not exist in the database: {tables_not_present}"
)
def get_primary_key_name(self, table_name: str):
"""Gets the name of the first primary key of a given table
As far as I know, MeiliSearch only supports one primary key
Will be updated to support more than one if needed
"""
return next(
map(
lambda tab: tab.primary_key.columns.values()[0].name,
filter(lambda tb: tb.name == table_name,
self.metadata.sorted_tables),
))
class MeilisearchConn:
"""
Class that holds all the data and
relevant methods pertaining to the
Meilisearch connection
"""
indexes: List[str]
meilisearch_client: meilisearch.Client
def __init__(self, host: str, api_key: str, indexes: List[str]) -> None:
self.indexes = indexes
self.meilisearch_client = meilisearch.Client(host, api_key)
def upload_data_to_index(self, index: str, data: List[Dict],
primary_key: str):
"""
Uploads a list to an index
:param index: Name of the index to upload the table to
:param data: List of dicts containing the data to be uploaded
:param primary_key: Primary key of the index
"""
self.meilisearch_client.get_index(index).update_documents(
data, primary_key=primary_key)
logging.info("Exported %d rows to Meilisearch index %s", len(data),
index)
def validate_indexes(self, db_con: DatabaseConn):
"""
Checks if all the indexes specified in the config file
exist in the given Meilisearch instance
Also checks if the number of tables is mismatched and
if so raises an exception.
This script assumes that if there are more specified
tables than there are indexes, that the corresponding indexes
share the same name
"""
diff: int = len(db_con.tables) - len(self.indexes)
if diff == 0:
return
if diff < 0:
raise jsonschema.exceptions.ValidationError(
"There cannot be more indexes than tables defined.")
if diff > 0:
for table in db_con.tables[len(db_con.tables) - diff:]:
self.indexes.append(table)
indexes_not_present: List = list(
filter(
lambda idx: idx not in list(
map(
lambda index: index["uid"],
self.meilisearch_client.get_indexes(),
)),
self.indexes,
))
if len(indexes_not_present) > 0:
raise KeyError(
f"The following indexes are not present in Meilisearch: {indexes_not_present}"
)
def create_connection_from_dict(
json_data: Json, schema_path: str) -> (DatabaseConn, MeilisearchConn):
"""
This function validates the configuration data against a schema
and if it is successful, instantiates a DatabaseConn and a
MeilisearchConn class.
It also proceeds to create a connection to the database and also
validates both the indexes and tables of the instantiated objects
"""
validate_json(json_data, schema_path)
database: DatabaseConn = DatabaseConn(**json_data["database"])
meili: MeilisearchConn = MeilisearchConn(**json_data["meilisearch"])
database.connect_to_db()
meili.validate_indexes(database)
database.validate_tables()
return database, meili
def get_schema(schema_path: str) -> Json:
"""Loads the schema from a file and returns it"""
with open(schema_path, "r") as schema_file:
schema: Json = json.load(schema_file)
if not schema:
raise ValueError("Schema file was empty or failed to open.")
return schema
def validate_json(json_data: Json, schema_path: str) -> None:
"""Validates a Json dict against the schema"""
script_schema: Json = get_schema(schema_path)
try:
validate(instance=json_data, schema=script_schema)
except jsonschema.exceptions.ValidationError as error:
logging.critical(error)
def export_tables(db_con: DatabaseConn, meili: MeilisearchConn,
max_chunk_size: int) -> None:
"""Exports all the tables to their respective MeiliSearch indexes in chunks of
5000 elements at a time"""
for idx, table_name in enumerate(db_con.tables):
logging.info("Starting the export of table %s", table_name)
primary_key_name: str = db_con.get_primary_key_name(table_name)
meili_index: str = meili.indexes[idx]
total_size: int = db_con.database_engine.execute(
select([func.count()]).select_from(
db_con.metadata.tables[table_name])).scalar()
if total_size < max_chunk_size:
values: List = [
dict(row) for row in db_con.database_engine.execute(
db_con.metadata.tables[table_name].select())
]
meili.upload_data_to_index(meili_index, values, primary_key_name)
else:
chunks: int = ceil(total_size / max_chunk_size)
for chunk in range(chunks):
values: List = [
dict(row) for row in db_con.database_engine.execute(
select(["*"]).select_from(
db_con.metadata.tables[table_name]).order_by(primary_key_name).offset(
chunk * max_chunk_size).limit(max_chunk_size))
]
meili.upload_data_to_index(meili_index, values,
primary_key_name)
logging.info("Finished exporting table %s to index %s", table_name,
meili_index)
def run_with_config_file(file_path: str, schema_path: str,
max_chunk_size: int):
"""Loads the configuration from a given path,
and runs the export process
"""
with open(file_path) as json_file:
data: Json = json.load(json_file)
db_conn, meili_conn = create_connection_from_dict(data, schema_path)
export_tables(db_conn, meili_conn, max_chunk_size)
logging.info("Finished exporting all tables to Meilisearch")
logging.info(
"You can browse the exported data at %s",
meili_conn.meilisearch_client.config.url,
)
def main():
"""
Parses the command line arguments to ensure that a valid
config file is passed
"""
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser(
description="Exports tables from a database to a Meilisearch instance")
parser.add_argument(
"-c",
"--config",
dest="path",
metavar="FILE_PATH",
action="store",
help="Specifies the path for the config file",
required=True,
type=str,
)
parser.add_argument(
"-s",
"--schema",
dest="schema",
metavar="SCHEMA_FILE_PATH",
action="store",
help="Specifies the path for the schema file",
required=False,
type=str,
)
parser.add_argument(
"-cs",
"--chunk_size",
dest="chunk_size",
metavar="CHUNK_SIZE",
action="store",
help="Specifies the max number of rows to export in a single chunk",
required=False,
type=int,
)
args = parser.parse_args()
file_path: str = args.path
schema_path: str = "schema.json"
max_chunk_size: int = 5000
if args.chunk_size:
max_chunk_size = args.chunk_size
if args.schema:
schema_path = args.schema
if not isfile(schema_path):
logging.critical("Specified schema file path is not a valid file!")
sys_exit(2)
if not isfile(file_path):
logging.critical("Specified config file path is not a valid file!")
sys_exit(2)
run_with_config_file(file_path, schema_path, max_chunk_size)
if __name__ == "__main__":
main()