-
Notifications
You must be signed in to change notification settings - Fork 8
/
firewall.py
74 lines (65 loc) · 2.8 KB
/
firewall.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import re
import requests
import json
import logging
action = {
"block":"deny",
"allow":"allow"
}
class Firewall:
def __init__(self,url,arheader,cfkey):
self.country = []
self.uri = []
self.ip = []
self.action = ""
self.desc = ""
self.arvan_url = url
self.arheader = arheader
self.cfkey = cfkey
def cloudmsg(self,response):
return ''.join(["Cloudflare: ", response])
def arvanmsg(self,response):
return ''.join(["Arvancloud: ", response])
def expression_parser(self,str):
logging.debug("Analyzing the phrase firewall rule")
try:
self.country = re.findall('ip.geoip.country eq "(.+?)"', str)
self.uri = re.findall('http.request.uri eq "(.+?)"', str)
self.ip = re.findall('ip.src eq (.+?)and', str)
logging.info("The firewall rule phrase analysis was performed successfully")
except re.error:
logging.error("An error occurred while parsing the phrase firewall rule")
def get_firewall_rules(self,zone_id):
logging.debug(self.cloudmsg("Receiving firewall rules"))
url = "https://api.cloudflare.com/client/v4/zones/"+zone_id+"/firewall/rules"
payload={}
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer '+self.cfkey
}
response = requests.request("GET", url, headers=headers, data=payload).json()
logging.info(self.cloudmsg("Firewall rules received"))
self.response_parser(response)
def response_parser(self,response):
logging.debug("Analyze the received response of the clodflare")
for numrule in range(0,len(response["result"])):
logging.debug("A firewall rule was found")
self.action = action[response["result"][numrule]["action"]]
self.desc = response["result"][numrule]["description"]
expresion = response["result"][numrule]["filter"]["expression"]
self.expression_parser(expresion)
self.send_firewall_rules()
def send_firewall_rules(self):
url = self.arvan_url
payload={
"name" : self.desc ,
"url_pattern" : self.uri[0] if len(self.uri) > 0 else "**" ,
"sources" : self.country + self.ip ,
"action" : self.action
}
print(json.dumps(payload,5))
response = requests.request("POST", url, headers=self.arheader, data=json.dumps(payload,5))
if response.status_code == 201 :
logging.info(self.arvanmsg("This firewall rule was successfully created"))
else:
logging.error(self.arvanmsg("An error occurred while registering the firewall rule - {} {} {}".format(self.uri[0],self.action,self.desc)))