-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathproxy_ip_pool.py
92 lines (77 loc) · 2.65 KB
/
proxy_ip_pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# -*- coding: utf-8 -*-
# @Author : [email protected]
# @Time : 2023/12/2 13:45
# @Desc : ip代理池实现
import json
import pathlib
import random
from typing import List
import httpx
from tenacity import retry, stop_after_attempt, wait_fixed
from tools import utils
from .proxy_ip_provider import IpInfoModel, IpProxy
class ProxyIpPool:
def __init__(self, ip_pool_count: int, enable_validate_ip: bool) -> None:
self.valid_ip_url = "https://httpbin.org/ip" # 验证 IP 是否有效的地址
self.ip_pool_count = ip_pool_count
self.enable_validate_ip = enable_validate_ip
self.proxy_list: List[IpInfoModel] = []
async def load_proxies(self) -> None:
"""
解析
:return:
"""
self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count)
async def is_valid_proxy(self, proxy: IpInfoModel) -> bool:
"""
验证代理IP是否有效
:param proxy:
:return:
"""
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} is it valid ")
try:
httpx_proxy = {
f"{proxy.protocol}": f"http://{proxy.user}:{proxy.password}@{proxy.ip}:{proxy.port}"
}
async with httpx.AsyncClient(proxies=httpx_proxy) as client:
response = await client.get(self.valid_ip_url)
if response.status_code == 200:
return True
else:
return False
except Exception as e:
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} err: {e}")
raise e
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def get_proxy(self) -> IpInfoModel:
"""
从代理池中随机提取一个代理IP
:return:
"""
if len(self.proxy_list) == 0:
await self.reload_proxies()
proxy = random.choice(self.proxy_list)
if self.enable_validate_ip:
if not await self.is_valid_proxy(proxy):
raise Exception("[ProxyIpPool.get_proxy] current ip invalid and again get it")
self.proxy_list.remove(proxy)
return proxy
async def reload_proxies(self):
"""
# 重新加载代理池
:return:
"""
self.proxy_list = []
await self.load_proxies()
async def create_ip_pool(ip_pool_count: int, enable_validate_ip) -> ProxyIpPool:
"""
创建 IP 代理池
:param ip_pool_count:
:param enable_validate_ip:
:return:
"""
pool = ProxyIpPool(ip_pool_count, enable_validate_ip)
await pool.load_proxies()
return pool
if __name__ == '__main__':
pass