This repository has been archived by the owner on May 14, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpacked_stream.py
82 lines (67 loc) · 2.61 KB
/
packed_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import io
import struct
from typing import List
BUFFER_SIZE = 0x1000
class StreamPackWriter:
"""Sequentially writes several streams into one.
Input streams should be seekable.
Output format: number of stream and concatenation of (size, chunk) pairs
for each input stream, where size is stream length and chunk is stream content.
All inetegers are 8-byte unsigned big-endian.
"""
def __init__(self, dest: io.BufferedWriter):
self.inputs = []
self.output = dest
def add_input(self, stream: io.BufferedReader):
self.inputs.append(stream)
def transmit(self):
self.output.write(struct.pack(">Q", len(self.inputs)))
for stream in self.inputs:
cpos = stream.tell()
stream.seek(0, io.SEEK_END)
size = stream.tell() - cpos
stream.seek(cpos)
self._transmit_stream(stream, size)
def _transmit_stream(self, stream, size):
buf = struct.pack(">Q", size)
while buf:
print(f"{buf}")
self.output.write(buf)
buf = stream.read(BUFFER_SIZE)
class StreamPackReader:
"""Decodes stream pack into original streams. """
def __init__(self, source: io.BufferedReader):
self.source = source
stream_count_enc = source.read(8)
self.stream_count = struct.unpack(">Q", stream_count_enc)[0]
self.outputs = None
def set_outputs(self, outputs: List[io.BufferedWriter]):
self.outputs = outputs
def receive(self, outputs):
if len(outputs) != self.stream_count:
raise ValueError("Output count mismatch")
for stream in outputs:
self._receive_stream(stream)
def _receive_stream(self, output):
stream_size_enc = self.source.read(8)
stream_size = struct.unpack(">Q", stream_size_enc)[0]
recv_size = 0
while recv_size < stream_size:
buf = self.source.read(min(BUFFER_SIZE, stream_size - recv_size))
recv_size += len(buf)
output.write(buf)
if __name__ == "__main__":
lines = ["qwerty", "1337"]
inputs = [io.BytesIO(ln.encode()) for ln in lines]
dest = io.BytesIO()
pack_writer = StreamPackWriter(dest)
for istream in inputs:
pack_writer.add_input(istream)
pack_writer.transmit()
dest.seek(0)
pack_reader = StreamPackReader(dest)
outputs = [io.BytesIO() for i in range(pack_reader.stream_count)]
print(f"Outputs: {outputs}")
pack_reader.receive(outputs)
for idx, stream in enumerate(outputs):
print(f"Stream {idx}: {stream.getvalue()}")