-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathxidb.py
122 lines (97 loc) · 2.38 KB
/
xidb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import ipfshttpclient
import uuid
import json
import zlib
import time
import os
import cid
import binascii
import shutil
def getIpfs():
connect = os.environ.get('IPFS_CONNECT')
if connect:
return ipfshttpclient.connect(connect)
else:
return ipfshttpclient.connect()
def checkIpfs():
for i in range(10):
try:
ipfs = getIpfs()
# print(ipfs.id())
return True
except:
print(i, "attempting to connect to IPFS...")
time.sleep(1)
return False
def verifyXid(xid):
try:
u = uuid.UUID(xid)
z = zlib.compress(u.bytes)
if len(z) > len(u.bytes):
return str(u)
print(f"invalid {xid} compresses to {len(z)}")
return None
except:
return None
def getXid(cid):
xid = None
ipfs = getIpfs()
try:
meta = json.loads(ipfs.cat(cid))
xid = meta['xid']
except:
try:
meta = json.loads(ipfs.cat(cid + '/meta.json'))
xid = meta['xid']
except:
try:
# deprecated
xid = ipfs.cat(cid + '/xid')
xid = xid.decode().strip()
except:
# print(f"error: unable to retrieve xid for {cid}")
return None
return verifyXid(xid)
def getMeta(cid):
meta = None
ipfs = getIpfs()
try:
meta = json.loads(ipfs.cat(cid))
except:
try:
meta = json.loads(ipfs.cat(cid + '/meta.json'))
except:
pass
return meta
def getVersions(cid):
versions = []
version = getMeta(cid)
version['auth_cid'] = cid
while version:
versions.append(version)
prev = version['prev']
if prev:
version = getMeta(prev)
version['auth_cid'] = prev
else:
version = None
versions.reverse()
return versions
def addCert(cert):
ipfs = getIpfs()
res = ipfs.add(cert, recursive=True)
for item in res:
if item['Name'] == cert:
return item['Hash']
def pin(cid):
try:
ipfs = getIpfs()
ipfs.get(cid)
res = ipfs.add(cid, recursive=True)
shutil.rmtree(cid)
return True
except:
return False
def encodeCid(hash):
cid1 = cid.make_cid(hash)
return binascii.hexlify(cid1.to_v1().buffer).decode()