-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
228 lines (176 loc) · 7.22 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# -*- coding: utf-8 -*-
""" Donkys Downloader by multi-range
使用http range和ftp REST请求的分段下载脚本
"""
import os
import time
import multiprocessing
import argparse
import requests
from abc import ABCMeta, abstractmethod, abstractstaticmethod
BUFFSIZE = 1024 * 16
class Downloader():
__metaclass__ = ABCMeta
def __init__(self, url, target_file_path, task_num):
self.url = url
self.target_file_path = target_file_path
self.task_num = task_num
self.length = None
@abstractmethod
def _get_length(self):
""" 获取目标文件大小 """
pass
@abstractmethod
def run_splited_task(self):
""" 执行分段下载任务 """
pass
def split_range(self):
""" 按照目标大小和任务数对目标分片 """
# length = 100
# 分片 (0 ~ a), (a+1, 99)
self._get_length()
splited_range_list = []
delta = self.length // self.task_num
cursor = 0
for i in range(self.task_num - 1):
start = cursor
end = cursor + delta
range_tuple = (start, end)
splited_range_list.append(range_tuple)
cursor = end+1
splited_range_list.append((cursor, self.length-1))
return splited_range_list
class FTPDownloader(Downloader):
# TODO: FTP downloader
def __init__(self, ftp_url, ftp_name, ftp_pass, target_file_path, task_num):
self.ftp_name = ftp_name
self.ftp_pass = ftp_pass
super().__init__(http_url, target_file_path, task_num)
def _get_length(self):
ret = self.fd_ftp.sendcmd('SIZE %s' % self.ftp_path)
len_total = ret.split()[1]
return int(len_total)
def run_splited_task(self):
pass
class HTTPDownloader(Downloader):
def __init__(self, http_url, target_file_path, task_num):
super().__init__(http_url, target_file_path, task_num)
def _get_length(self):
response = requests.head(self.url, headers={
'accept-encoding': 'gzip;q=0,deflate,sdch',
})
print(response.headers)
self.length = int(response.headers['Content-Length'])
# print(self.length)
def run_splited_task(self, target_lock, size_done_array, size_full_array, process_ordinal, range_start, range_end):
pid = os.getpid()
size_full_array[process_ordinal] = (range_end - range_start + 1)
print("run task %d, range: %s-%s, process id: %s." % (
process_ordinal, range_start, range_end, pid
))
try:
# 必须捕获 否则进程异常完全没有提示
# 捕获手动输出异常也是有必要的
response = requests.get(self.url, headers={
'Range': 'bytes=%s-%s' % (range_start, range_end),
'accept-encoding': 'gzip;q=0,deflate,sdch',
})
offset = range_start
with target_lock:
# print("process %s get lock" % pid)
with open(self.target_file_path,'a'):
# 手动创建空文件
# 后面写入必须用r+ 但是r+不能创建文件
pass
with open(self.target_file_path, "rb+") as fd_target:
# 这里必须用r+ 每个进程要向文件相应的地方填入内容 为填入时都是\x00
# w回使用新文件, 之前线程的写入丢失
# a只能从尾部修改, 不能修改已存在的部分
fd_target.seek(offset, os.SEEK_SET)
buff_iter = response.iter_content(chunk_size=BUFFSIZE)
for buff in buff_iter:
fd_target.write(buff)
# recode downloaded size
# size_done.value += len(buff)
# print('size downloaded: %d' % size_done.value)
size_done_array[process_ordinal] += len(buff)
# 测试文件当前大小
# filesize = os.path.getsize(self.target_file_path)
# print("now file size:", filesize)
# print("process %s release lock" % pid)
except Exception as e:
print(e)
# TODO: 被主进程捕获 让主进程重新调用此任务
print("process %s ok!" % pid)
def optioninit():
""" get check args and options """
parser = argparse.ArgumentParser(
description='A testing simulation of HTTP Range and FTP RETR request with threads.')
parser.add_argument(
'url', help='url target', type=str)
parser.add_argument(
'-o', '--output', help='saved target path', type=str, default="a.out")
parser.add_argument(
'-t', '--tasks', help='number of tasks', type=int, default=2)
return parser.parse_args()
def main():
# 获取参数 url 存储的文件名 并发任务数
args = optioninit()
print(args)
# localtest
# args.url = "http://download.firefox.com.cn/releases/firefox/42.0/zh-CN/Firefox-latest-x86_64.tar.bz2"
# # args.url = "http://192.168.137.10"
# args.output = "a.out"
# args.tasks = 2
# 区别下载类型 声明下载对象
# if "ftp:":
# downloader = FTPDownloader()
# else:
# downloader = HTTPDownloader()
downloader = HTTPDownloader(args.url, args.output, args.tasks)
# 计算分割片段的范围
splited_range_list = downloader.split_range()
print(splited_range_list)
# 创建进程池
max_process_num = multiprocessing.cpu_count() * 2
process_num = args.tasks if args.tasks < max_process_num else max_process_num
process_pool = multiprocessing.Pool(process_num)
# 目标文件写入锁
target_lock = multiprocessing.Manager().Lock()
# 共享Array 保存每个进程下载任务的的已完成长度(size_done_array) 和 总长度(size_full_array)
size_done_array = multiprocessing.Manager().Array('I', [0] * args.tasks)
size_full_array = multiprocessing.Manager().Array('I', [0] * args.tasks)
# 加入任务
process_ordinal = 0 # 进程序号 和共享Array的位置相对应
for splited_range in splited_range_list:
process_pool.apply_async(
downloader.run_splited_task,
args=(target_lock, size_done_array, size_full_array, process_ordinal) + splited_range
)
process_ordinal += 1
process_pool.close()
# 当前进程主线程打印下载进度
while True:
# print(size_full_array)
# print(size_done_array)
# 完成的任务数
done_count = 0
for i in range(len(size_full_array)):
# 打印每个进程的完成情况
current_size = size_done_array[i]
total_size = size_full_array[i]
if total_size == 0:
continue
print(' || %d size downloaded: %.2f %%' % (
i, (current_size * 100 / total_size)
), end='')
if current_size >= total_size and total_size != 0:
done_count += 1
print()
if done_count == len(size_full_array):
# 下载完成 退出
print('Downloaded well done!')
break
time.sleep(0.2)
if __name__ == "__main__":
main()