forked from himlpplm/collectSub
-
Notifications
You must be signed in to change notification settings - Fork 16
/
main.py
176 lines (166 loc) · 5.98 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import re
import os
import yaml
import threading
import base64
import requests
from loguru import logger
from tqdm import tqdm
from retry import retry
from pre_check import pre_check
new_sub_list = []
new_clash_list = []
new_v2_list = []
play_list = []
@logger.catch
def yaml_check(path_yaml):
print(os.path.isfile(path_yaml))
if os.path.isfile(path_yaml): #存在,非第一次
with open(path_yaml,encoding="UTF-8") as f:
dict_url = yaml.load(f, Loader=yaml.FullLoader)
else:
dict_url = {
"机场订阅":[],
"clash订阅":[],
"v2订阅":[],
"开心玩耍":[]
}
# with open(path_yaml, 'w',encoding="utf-8") as f:
# data = yaml.dump(dict_url, f,allow_unicode=True)
logger.info('读取文件成功')
return dict_url
@logger.catch
def get_config():
with open('./config.yaml',encoding="UTF-8") as f:
data = yaml.load(f, Loader=yaml.FullLoader)
list_tg = data['tgchannel']
new_list = []
for url in list_tg:
a = url.split("/")[-1]
url = 'https://t.me/s/'+a
new_list.append(url)
return new_list
@logger.catch
def get_channel_http(channel_url):
try:
with requests.post(channel_url) as resp:
data = resp.text
url_list = re.findall("https?://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]", data) # 使用正则表达式查找订阅链接并创建列表
logger.info(channel_url+'\t获取成功')
except Exception as e:
logger.warning(channel_url+'\t获取失败')
logger.error(channel_url+e)
url_list = []
finally:
return url_list
# @logger.catch
# def get_channel_http(channel_url):
# headers = {
# 'Referer': 'https://t.me/s/wbnet',
# 'sec-ch-ua-mobile': '?0',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
# }
# try:
# with requests.post(channel_url,headers=headers) as resp:
# data = resp.text
# url_list = re.findall("https?://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]", data) # 使用正则表达式查找订阅链接并创建列表
# logger.info(channel_url+'\t获取成功')
# except Exception as e:
# logger.error('channel_url',e)
# logger.warning(channel_url+'\t获取失败')
# url_list = []
# finally:
# return url_list
def filter_base64(text):
ss = ['ss://','ssr://','vmess://','trojan://']
for i in ss:
if i in text:
return True
return False
@logger.catch
def sub_check(url,bar):
headers = {'User-Agent': 'ClashforWindows/0.18.1'}
with thread_max_num:
@retry(tries=2)
def start_check(url):
res=requests.get(url,headers=headers,timeout=5)#设置5秒超时防止卡死
if res.status_code == 200:
try: #有流量信息
info = res.headers['subscription-userinfo']
info_num = re.findall('\d+',info)
if info_num :
upload = int(info_num[0])
download = int(info_num[1])
total = int(info_num[2])
unused = (total - upload - download) / 1024 / 1024 / 1024
unused_rounded = round(unused, 2)
if unused_rounded > 0:
new_sub_list.append(url)
play_list.append('可用流量:' + str(unused_rounded) + ' GB ' + url)
except:
# 判断是否为clash
try:
u = re.findall('proxies:', res.text)[0]
if u == "proxies:":
new_clash_list.append(url)
except:
# 判断是否为v2
try:
# 解密base64
text = res.text[:64]
text = base64.b64decode(text)
text = str(text)
if filter_base64(text):
new_v2_list.append(url)
# 均不是则非订阅链接
except:
pass
else:
pass
try:
start_check(url)
except:
pass
bar.update(1)
if __name__=='__main__':
path_yaml = pre_check()
dict_url = yaml_check(path_yaml)
# print(dict_url)
list_tg = get_config()
logger.info('读取config成功')
#循环获取频道订阅
url_list = []
for channel_url in list_tg:
temp_list = get_channel_http(channel_url)
url_list.extend(temp_list)
logger.info('开始筛选---')
thread_max_num = threading.Semaphore(32) # 32线程
bar = tqdm(total=len(url_list), desc='订阅筛选:')
thread_list = []
for url in url_list:
# 为每个新URL创建线程
t = threading.Thread(target=sub_check, args=(url, bar))
# 加入线程池并启动
thread_list.append(t)
t.setDaemon(True)
t.start()
for t in thread_list:
t.join()
bar.close()
logger.info('筛选完成')
old_sub_list = dict_url['机场订阅']
old_clash_list = dict_url['clash订阅']
old_v2_list = dict_url['v2订阅']
new_sub_list.extend(old_sub_list)
new_clash_list.extend(old_clash_list)
new_v2_list.extend(old_v2_list)
new_sub_list = list(set(new_sub_list))
new_clash_list = list(set(new_clash_list))
new_v2_list = list(set(new_v2_list))
play_list = list(set(play_list))
dict_url.update({'机场订阅':new_sub_list})
dict_url.update({'clash订阅': new_clash_list})
dict_url.update({'v2订阅': new_v2_list})
dict_url.update({'开心玩耍': play_list})
with open(path_yaml, 'w',encoding="utf-8") as f:
data = yaml.dump(dict_url, f,allow_unicode=True)