forked from hoytech/strfry
-
Notifications
You must be signed in to change notification settings - Fork 3
/
write-policy.py
99 lines (84 loc) · 2.3 KB
/
write-policy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python3
import sys
import json
# ENTER YOUR HEX PUBKEY(S) BELOW:
whitelist = {
"hex-pubkey-1",
"hex-pubkey-2"
}
# ENTER YOUR ALLOWED IP(S) BELOW:
sources = {
"1.1.1.1",
"8.8.8.8"
}
# ENTER YOUR ALLOWED TYPE(S) BELOW:
types = {
"Stream",
"Import",
"Sync"
}
def eprint(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr, flush=True)
def accept(request):
response = {
'id' : request['event']['id']
}
response['action'] = 'accept'
r = json.dumps(response,separators=(',', ':')) # output JSONL
print(r, end='\n', file=sys.stdout, flush=True)
def reject(request):
response = {
'id' : request['event']['id']
}
response['action'] = 'reject'
response['msg'] = f"blocked: pubkey {request['event']['pubkey']} not in whitelist | SOURCE: {request['sourceInfo']}"
r = json.dumps(response,separators=(',', ':')) # output JSONL
print(r, end='\n', file=sys.stdout, flush=True)
def main():
for line in sys.stdin:
request = json.loads(line)
try:
if request['type'] == 'lookback':
continue
except KeyError:
eprint("input without type in write policy plugin")
continue
if request['type'] != 'new':
eprint("unexpected request type in write policy plugin")
continue
try:
if not request['event']['id']:
eprint("input without event id in write policy plugin")
continue
except KeyError:
eprint("input without event id in write policy plugin")
continue
try:
if request['event']['pubkey'] in whitelist:
accept(request)
continue
elif int(request['event']['kind']) == 10002:
accept(request)
continue
elif request['sourceType'] in types:
accept(request)
continue
elif request['sourceInfo'] in sources:
accept(request)
continue
elif request.get("event", {}).get("tags"):
if p_tags:= [x for x in request['event']['tags'] if x[0] == 'p']:
pubkeys = [x[1] for x in p_tags]
if whitelist.intersection(pubkeys):
accept(request)
continue
reject(request)
continue
else:
reject(request)
continue
except KeyError:
eprint("poorly formed event input in write policy plugin")
continue
if __name__=='__main__':
main()