-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxmly_speed.py
128 lines (107 loc) · 4.6 KB
/
xmly_speed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import requests
import os
import re
import time
from datetime import datetime
from dateutil import tz
def loadFileContent(downloadUrl):
return requests.get(downloadUrl).text
def writeFile(content, fileName):
file = './'+fileName
with open(file, 'w', encoding='utf-8') as f:
f.write(content.replace('\r\n', '\n'))
def safe_cast(val, to_type, default=None):
try:
return to_type(val)
except (ValueError, TypeError):
return default
def readSecret(key):
if key in os.environ and not os.environ[key].strip() == '':
return os.environ[key]
else:
return None
def isOver():
hourLimit = readSecret("XMLY_ACCUMULATE_HOURS")
print("HOURS:"+str(safe_cast(hourLimit, int, -1)))
if not hourLimit is None and safe_cast(hourLimit, int, 0) > 0:
date_stamp = (int(time.time())-57600) % 86400
print(datetime.now(tz=tz.gettz('Asia/Shanghai')
).strftime("%Y-%m-%d %H:%M:%S", ))
print("今日已过秒数: ", date_stamp)
if date_stamp > int(hourLimit) * 60 * 60:
return True
else:
return False
else:
print("未配置XMLY_ACCUMULATE_HOURS,因此不会对执行时长进行限制")
return False
def isJumpIndex(idx):
indexLimit = readSecret("XMLY_ACCUMULATE_INDEX")
if indexLimit is None:
return False
xmly_accumulate_index = [safe_cast(i, int, -1)
for i in indexLimit.split(",")]
if len(xmly_accumulate_index) <= 0:
xmly_accumulate_index = None
index = -1
try:
index = xmly_accumulate_index.index(idx+1)
except ValueError:
print("无需禁用")
return index >= 0
def smartNotify(content):
notify_bark = readSecret("BARK_PUSH")
notify_serverJ = readSecret("PUSH_KEY")
# notify_tg_token = readSecret("TG_BOT_TOKEN")
# notify_tg_userId = readSecret("TG_USER_ID")
if not content:
return content
if notify_bark is not None:
print("bark通知已开启")
content = content.replace(
'bark_token = BARK', 'bark_token="'+notify_bark+'"', 1)
if notify_serverJ is not None:
print("server酱通知已开启")
content = content.replace(
'sckey = SCKEY', 'sckey="'+notify_serverJ+'"', 1)
# if notify_tg_token is not None and notify_tg_userId is not None:
# content = content.replace("", "")
# only for test
# content = content.replace(
# 'if _notify_time.split()[0] == str(notify_time) and int(_notify_time.split()[1]) > 30:', 'if True:', 1)
return content
def run():
cookies = readSecret("XMLY_SPEED_COOKIE")
if cookies is None:
print("无法获取Cookie,请在Secrets中配置XMLY_SPEED_COOKIE")
return
print("\n同步文件中...")
webFileContent = loadFileContent(os.environ["SYNCURL"])
print("\n文件同步完毕, 处理中...")
agentPattern = re.compile(r'UserAgent = \"[\d\D]*?\"', re.S)
rewriteAgent = readSecret("XMLY_ANDROID_AGENT")
if rewriteAgent is None:
rewriteAgent = 'UserAgent = "ting_1.8.30(Redmi+7,Android28)"'
else:
rewriteAgent = 'UserAgent = "' + rewriteAgent + '"'
for idx, xmlyCookie in enumerate(cookies.split('\n')):
executeContent = webFileContent.replace(
'xmly_speed_cookie = os.environ["XMLY_SPEED_COOKIE"]', 'xmly_speed_cookie = "' + xmlyCookie + '"', 1)
if xmlyCookie.find("_device=android") > 0: # 此时表示是获取的安卓的cookie,需要使用安卓的agent
executeContent = re.sub(agentPattern, rewriteAgent, executeContent)
if isOver():
executeContent = executeContent.replace("XMLY_ACCUMULATE_TIME = 1", "XMLY_ACCUMULATE_TIME = 0", 1).replace(
"action 自动刷时长打开", "action 自动刷时长2指定关闭", 1)
print("已接受XMLY_ACCUMULATE_HOURS配置,为索引" +
str(idx)+"的数据执行了禁用当天继续刷新时长的操作")
if isJumpIndex(idx+1):
executeContent = executeContent.replace("XMLY_ACCUMULATE_TIME = 1", "XMLY_ACCUMULATE_TIME = 0", 1).replace(
"action 自动刷时长打开", "action 自动刷时长被指定关闭", 1)
print("已接受XMLY_ACCUMULATE_INDEX配置,为索引" +
str(idx)+"的数据执行了禁用当天刷新时长的操作")
executeContent = smartNotify(executeContent)
writeFile(executeContent, 'execute'+str(idx)+'.py')
os.system('python ./'+'execute'+str(idx)+'.py')
print("\n***************************\n文件全部执行完毕")
exit(0)
run()