This repository has been archived by the owner on Oct 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
merge.py
198 lines (182 loc) · 6.37 KB
/
merge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import click
import openpyxl
import csv as _csv
from type import MiniumScoreForUnivs, EnrollPlan, MiniumScoreForMajors
from typing import TypedDict, cast
from functions.hash import generate_random_hash
class Form(TypedDict):
"""完整表格"""
province: list[MiniumScoreForUnivs]
"""省分数线"""
major: list[MiniumScoreForMajors]
"""专业分数线"""
enroll: list[EnrollPlan]
"""招生计划"""
HEADERS = (
'code,name,located_province,target_province,major,year,enroll_level,enroll_type,minium_score,minium_rank,prov_minium_score,major_group,subject_requirements', # 省分数线
'code,name,located_province,target_province,year,major,enroll_level,major_name,planned_number,duration,tuition,major_group,subject_requirements', # 招生计划
'code,name,located_province,target_province,year,major,major_name,enroll_level,avg_score,minium_score,minium_rank,major_group,subject_requirements' # 专业分数线
)
READABLE_HEADERS = {
'province': [
'学校代码',
'学校名称全称',
'所在省份',
'面向省份',
'科类',
'年份',
'录取批次',
'招生类型',
'最低分',
'最低位次',
'省控线',
'专业组',
'选科要求'
],
'enroll': [
'学校代码',
'学校名称全称',
'所在省份',
'面向省份',
'年份',
'科类',
'招生批次',
'招生专业名称',
'计划招生',
'学制',
'学费',
'专业组'
'选科要求'
],
'major': [
'学校代码',
'学校名称全称',
'所在省份',
'面向省份',
'年份',
'科类',
'录取专业名称',
'录取批次',
'平均分',
'最低分',
'最低位次',
'专业组',
'选科要求'
]
}
NAME_DICT = {
'province': '学校分数线',
'enroll': '各专业招生计划',
'major': '分专业录取分数线'
}
HASH = generate_random_hash()
@click.command('merge', help='合并多个CSV或XLSX文件,也可用于转换表格格式')
@click.option(
'--csv', '-c',
type=click.Path(
exists=True, file_okay=True,
readable=True, resolve_path=True
),
multiple=True,
help='CSV文件路径,此选项可重复使用'
)
@click.option(
'--xlsx', '-x',
type=click.Path(
exists=True, file_okay=True,
readable=True, resolve_path=True
),
multiple=True,
help='XLSX文件路径,此选项可重复使用'
)
@click.option(
'--type', '-t',
type=click.Choice(['csv', 'xlsx']),
help='输出文件类型,默认为csv,输出的文件将位于当前工作目录下',
default='csv'
)
@click.option(
'--remove-empty-lines/--no-remove-empty-lines',
default=True,
help='是否使生成文件中不包含来自CSV源文件的空行,默认开启'
)
def merge(csv: list[str], xlsx: list[str], type: str, remove_empty_lines: bool):
form: Form = {
'province': [],
'enroll': [],
'major': []
}
if not csv and not xlsx:
ctx = click.get_current_context()
ctx.fail('请至少指定一个CSV或XLSX文件')
for file in csv:
with open(file, 'r', encoding='utf-8') as f:
"""你用cast强制转类型的样子真的很狼狈"""
reader = _csv.DictReader(f) # type: ignore
header = ','.join(reader.fieldnames) # type: ignore
if header == HEADERS[0]:
"""省分数线"""
form['province'].extend(list(reader)) # type: ignore
elif header == HEADERS[1]:
"""招生计划"""
form['enroll'].extend(list(reader)) # type: ignore
elif header == HEADERS[2]:
"""专业分数线"""
form['major'].extend(list(reader)) # type: ignore
for file in xlsx:
wb = openpyxl.load_workbook(file)
if '学校分数线' in wb:
for row in wb['学校分数线'].rows:
if row[0].value == '学校代码':
continue
new_row = cast(MiniumScoreForUnivs,
{k: v for k, v in zip(
MiniumScoreForUnivs.__annotations__.keys(),
(v.value for v in row)
)})
form['province'].append(new_row)
if '各专业招生计划' in wb:
for row in wb['各专业招生计划'].rows:
if row[0].value == '学校代码':
continue
new_row = cast(EnrollPlan,
{k: v for k, v in zip(
EnrollPlan.__annotations__.keys(),
(v.value for v in row)
)})
form['enroll'].append(new_row)
if '分专业录取分数线' in wb:
for row in wb['分专业录取分数线'].rows:
if row[0].value == '学校代码':
continue
new_row = cast(MiniumScoreForMajors,
{k: v for k, v in zip(
MiniumScoreForMajors.__annotations__.keys(),
(v.value for v in row)
)})
form['major'].append(new_row)
if remove_empty_lines:
for k, v in form.items():
v = cast(list, v) # 这个语法真的好丑
form[k] = list(filter(lambda x: x, v))
if type == 'csv':
for k, v in form.items():
if v:
v = cast(list, v)
with open(f'{k}_output_{HASH}.csv', 'w', encoding='utf-8', newline='') as f:
writer = _csv.DictWriter(f, fieldnames=v[0].keys())
writer.writeheader()
writer.writerows(v)
elif type == 'xlsx':
wb = openpyxl.Workbook()
wb.remove(wb.active) # type: ignore
for k, v in form.items():
if v:
v = cast(list, v)
ws = wb.create_sheet(NAME_DICT[k])
ws.append(READABLE_HEADERS[k])
for row in v:
ws.append(list(row.values()))
wb.save(f'output_{HASH}.xlsx')
if __name__ == '__main__':
merge()