forked from rwth-i6/returnn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
NetworkStream.py
56 lines (50 loc) · 1.79 KB
/
NetworkStream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from TaskSystem import AsyncTask, ProcConnectionDied
from Log import log
import numpy
import json
try:
import SimpleHTTPServer
import SocketServer
import BaseHTTPServer
except ImportError: # Python3
import http.server as SimpleHTTPServer
import socketserver as SocketServer
BaseHTTPServer = SimpleHTTPServer
try:
from thread import start_new_thread
except ImportError:
# noinspection PyUnresolvedReferences
from _thread import start_new_thread
class NetworkStream:
class ThreadingServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
pass
def count(self):
return {'count': self.counter}
def data(self, start_index, count = 1):
idx = len(self.cache) - (self.counter - start_index)
if idx + count > self.cache_size:
count = self.cache_size - idx
if idx < 0:
return {'error': 'requested batch too old'}
if idx >= self.cache_size:
return {'error': 'requested batch too new'}
else:
return self.cache[idx:idx+count]
def __init__(self, name, port, cache_size = 100):
self.name = name
self.cache = []
self.cache_size = cache_size
self.counter = 0
from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer # https://pypi.python.org/pypi/jsonrpclib/0.1.6
server = SimpleJSONRPCServer(('0.0.0.0', port))
server.register_function(self.count, 'count')
server.register_function(self.data, 'data')
print >> log.v3, "json-rpc streaming on port", port
start_new_thread(server.serve_forever,())
def update(self, task, data, tags = []):
data = numpy.asarray(data)
package = {'task' : task, 'data' : data.tostring(), 'tags' : tags, 'dtype' : str(data.dtype), 'shape' : data.shape}
self.cache.append(package)
if len(self.cache) > self.cache_size:
self.cache.pop(0)
self.counter += 1