-
Notifications
You must be signed in to change notification settings - Fork 10
/
mysql-slave-sync-table
executable file
·242 lines (195 loc) · 7.56 KB
/
mysql-slave-sync-table
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env python3
# mysql-slave-sync-table (part of ossobv/vcutil) // wdoekes/2023
# // Public Domain
#
# This mysql-slave-sync-table script is dirty way to bring an old-fashioned
# MySQL slave (secondary) table in sync with the master (primary).
#
# It will:
# - open a connection to the primary and secondary;
# - loop over both, while fetching the lowest IDs from the specified table;
# - if the ID is found only on the primary, it is copied to the secondary;
# - if the ID is found only on the secondary, it is removed from there.
#
# Lastly, a dump of the entire table is fetched from both sides and compared.
# This is done twice. If the same IDs show up then things might be out of sync.
#
# Usage:
#
# mysql-slave-sync-table primary.cnf client secondary.cnf client \
# DATABASE TABLE
#
# See also:
#
# mysql-slave-skip-one-table
#
from configparser import ConfigParser
from collections import namedtuple
from datetime import datetime
from time import sleep
from MySQLdb import connect
Conns = namedtuple('Conns', 'primary secondary')
DBTable = namedtuple('DBTable', 'db name')
def now():
return datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
def read_my_cnf(conffile, section):
# Read mysql config file:
# [section]
# host=host
# user=user
# password=password
confparser = ConfigParser(interpolation=None)
assert confparser.read([conffile]) == [conffile], (
f'Error reading {conffile!r}')
assert section in confparser.sections(), confparser.sections()
return [
confparser[section][i] for i in ('host', 'user', 'password')]
def mysql_connect(conffile, section):
host, user, password = read_my_cnf(conffile, section)
conn = connect(host=host, user=user, password=password)
return conn
def find_first_record_by_id(cursor, dbtable, id_above=0):
cursor.execute(
(f'SELECT id FROM {dbtable.db}.{dbtable.name} WHERE id > %s '
'ORDER BY id LIMIT 1'), (id_above,))
res = cursor.fetchall()
assert len(res) <= 1, res
if not res:
return None
return res[0][0]
def execute(cursor, stmt, args):
print(f'-- {now()}')
print(stmt % args)
cursor.execute(stmt, args)
def delete_record_from_secondary(conn, dbtable, id_):
with conn.cursor() as cursor:
execute(
cursor,
f'DELETE FROM {dbtable.db}.{dbtable.name} WHERE id = %s',
(id_,))
conn.commit()
def copy_record_to_secondary(
cursor_primary, conn_secondary, dbtable, id_):
cursor_primary.execute(
f'SELECT * FROM {dbtable.db}.{dbtable.name} WHERE id = %s', (id_,))
res = cursor_primary.fetchall()
assert len(res) == 1, res
res = res[0]
args = ', '.join(['%s'] * len(res))
with conn_secondary.cursor() as cursor_secondary:
execute(
cursor_secondary,
f'INSERT INTO {dbtable.db}.{dbtable.name} VALUES ({args})',
res)
conn_secondary.commit()
def get_all_records(cursor, dbtable):
cursor.execute(f'SELECT * FROM {dbtable.db}.{dbtable.name}')
return cursor.fetchall()
def linear_scan_by_id_and_fix(conns, dbtable, start_at=0):
cursor_primary = conns.primary.cursor()
cursor_secondary = conns.secondary.cursor()
while True:
id_primary = find_first_record_by_id(
cursor_primary, dbtable, start_at)
id_secondary = find_first_record_by_id(
cursor_secondary, dbtable, start_at)
if id_primary == id_secondary:
# Found on both. Done if id_primary is not found.
if id_primary is None:
print(f'{now()}: done!')
break
start_at = id_primary
elif id_primary < id_secondary or id_secondary is None:
# Not found on primary. Copy to secondary.
copy_record_to_secondary(
cursor_primary, conns.secondary, id_primary)
start_at = id_primary
elif id_primary > id_secondary or id_primary is None:
# Not found on secondary. Remove from there.
delete_record_from_secondary(
conns.secondary, id_secondary)
start_at = id_secondary
else:
assert False, (id_primary, id_secondary)
def check_entire_table_diff(conns, dbtable, verbose=False):
with conns.primary.cursor() as cursor_primary:
values_primary = get_all_records(cursor_primary, dbtable)
columns = [i[0] for i in cursor_primary.description]
with conns.secondary.cursor() as cursor_secondary:
values_secondary = get_all_records(cursor_secondary, dbtable)
values_primary = tuple(sorted(values_primary))
values_secondary = tuple(sorted(values_secondary))
diffs = []
ida = idb = 0
while True:
if ida is not None and ida >= len(values_primary):
ida = None
if idb is not None and idb >= len(values_secondary):
idb = None
if ida is None and idb is None:
break
if ida is not None and (
idb is None or
values_primary[ida][0] < values_secondary[idb][0]):
if verbose:
print('(only on primary)', values_primary[ida])
diffs.append(('src-only', values_primary[ida][0]))
ida += 1
elif idb is not None and (
ida is None or
values_primary[ida][0] > values_secondary[idb][0]):
if verbose:
print('(only on secondary)', values_secondary[idb])
diffs.append(('dst-only', values_secondary[idb][0]))
idb += 1
elif values_primary[ida] == values_secondary[idb]:
ida += 1
idb += 1
else:
id_ = values_primary[ida][0]
if verbose:
print(f'(diff of id={id_}) values differ:')
for idx, (col_primary, col_secondary) in enumerate(
zip(values_primary[ida], values_secondary[idb])):
if col_primary != col_secondary:
print(
f' column {columns[idx]}: '
f'{col_primary!r} != {col_secondary!r}')
diffs.append(('diff', id_))
ida += 1
idb += 1
return diffs
def sql_linear_table_sync(
conffile_primary, section_primary,
conffile_secondary, section_secondary,
database, table):
dbtable = DBTable(db=database, name=table)
conn_primary = mysql_connect(conffile_primary, section_primary)
conn_secondary = mysql_connect(conffile_secondary, section_secondary)
conns = Conns(primary=conn_primary, secondary=conn_secondary)
verbose = True
verbose = False
if 1:
# This is slow because it does a round trip per connection per ID.
print(f'Doing slow sync/check on {dbtable.db}.{dbtable.name} now...')
linear_scan_by_id_and_fix(conns, dbtable, start_at=0)
# Fast. Check all records at once. Do this for small tables (<50k records)
# only.
res1 = set(check_entire_table_diff(conns, dbtable, verbose))
sleep(1)
res2 = set(check_entire_table_diff(conns, dbtable, False))
if (res1 & res2):
print('Not everything is in sync.. See IDs:', (res1 & res2))
elif res1 or res2:
print('Everything appears to sync up in a timely manner!')
else:
print('Everything is in sync!')
if __name__ == '__main__':
import sys
if len(sys.argv) == 7:
sql_linear_table_sync(*sys.argv[1:])
else:
print(
f'Usage: {sys.argv[0]} primary.cnf client secondary.cnf client '
f'DATABASE TABLE', file=sys.stderr)
exit(1)