-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathminidb.py
122 lines (98 loc) · 3.75 KB
/
minidb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# minimal seatbelt-like realtime websocket database thing
from autobahn.twisted.websocket import WebSocketServerProtocol, \
WebSocketServerFactory
from autobahn.twisted.resource import WebSocketResource
from twisted.web.static import File
from twisted.web.server import Site
from twisted.internet import reactor
import gzip
import json
import os
class DBFactory(WebSocketServerFactory):
def __init__(self, dbdir="db"):
WebSocketServerFactory.__init__(self)
self.clients = {} # peerstr -> client
self.dbdir = dbdir
self.docs = {} # _id -> {doc}
if os.path.exists(dbdir):
self.load_db()
else:
os.makedirs(dbdir)
# Create a changelog
self.change_fh = open(os.path.join(self.dbdir, '_changes'), 'w')
def get(self, key, default=None):
return self.docs.get(key, default)
def __getitem__(self, key):
return self.docs.__getitem__(key)
def __setitem__(self, key, value):
# TODO
pass
def __delitem__(self, key):
# TODO
pass
def load_db(self):
# Load into self.db
dbpath = os.path.join(self.dbdir, 'db.json')
if os.path.exists(dbpath):
self.docs = json.load(open(dbpath))
# Incorporate changes
changes_file = os.path.join(self.dbdir, "_changes")
if os.path.exists(changes_file):
for line in open(changes_file):
if len(line.strip()) > 2:
c = json.loads(line)
self.update_inmem(c)
# Serialize to db.json
json.dump(self.docs, open(dbpath, 'w'), indent=2)
# Archive _changes
changes_fh = open(changes_file)
changes_out_pattern = os.path.join(self.dbdir, "_changes.%d.gz")
changes_out_idx = 1
while os.path.exists(changes_out_pattern % (changes_out_idx)):
changes_out_idx += 1
changes_out_fh = gzip.open(changes_out_pattern % (changes_out_idx), 'wb')
changes_out_fh.writelines(changes_fh)
changes_out_fh.close()
changes_fh.close()
def register(self, client):
self.clients[client.peer] = client
# Send history
client.sendMessage(json.dumps(
{"type": "history",
"history": self.docs}))
def unregister(self, client):
if client.peer in self.clients:
del self.clients[client.peer]
def update_inmem(self, change_doc):
if change_doc['type'] == 'delete':
del self.docs[change_doc['id']]
else:
self.docs[change_doc['id']] = change_doc['doc']
def onchange(self, sender, change_doc):
self.update_inmem(change_doc)
self.change_fh.write("%s\n" % (json.dumps(change_doc)))
self.change_fh.flush()
for client in self.clients.values():
if client != sender:
client.sendMessage(json.dumps(change_doc))
class DBProtocol(WebSocketServerProtocol):
def onOpen(self):
self.factory.register(self)
WebSocketServerProtocol.onOpen(self)
def connectionLost(self, reason):
self.factory.unregister(self)
WebSocketServerProtocol.connectionLost(self, reason)
def onMessage(self, payload, isBinary):
if not isBinary:
change_doc = json.loads(payload)
self.factory.onchange(self, change_doc)
if __name__=='__main__':
factory = DBFactory()
factory.protocol = DBProtocol
ws_resource = WebSocketResource(factory)
root = File('.')
root.putChild('_db', ws_resource)
site = Site(root)
reactor.listenTCP(9669, site, interface='0.0.0.0')
print 'http://localhost:9669'
reactor.run()