Skip to content

Commit

Permalink
Fix kafka when using time intervals in the config
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoscon committed Aug 18, 2020
1 parent c9c56b6 commit 47d809e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
6 changes: 5 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ parquet:
codec: 'BROTLI'
level: 6
# Path controls where the file is written (if not using S3/GC) If null, will write to CWD. Must be absolute path. The path must also exist, Cryptostore will not
# create the path.
# create the path.
path: null


Expand Down Expand Up @@ -144,6 +144,10 @@ arctic: mongodb://127.0.0.1
# Data batching window, in seconds, or optionally a time interval: M(inutely), H(ourly), D(aily).
# These intervals will require that you have enough memory available to redis (and python) to hold this amount of data.
# String intervals can be combined with numbers, eg 2H is 2 hours, 5M is 5 minutes, etc.
# Note that if a time interval is selected and kafka is used, the timestamps used to aggregate the data
# for the time intervals from from Kafka's internal timestamp offsets - these will differ from the exchange
# provided timestamps - it may be anywhere from 0.001 ms to seconds depending on your kafka cluster and
# other hardware and setup configurations.
storage_interval: 60


Expand Down
31 changes: 27 additions & 4 deletions cryptostore/aggregator/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,43 @@ def _conn(self, key):
self.conn[key].subscribe([key])
return self.conn[key]

def read(self, exchange, dtype, pair):
def read(self, exchange, dtype, pair, start=None, end=None):
kafka = StorageEngines.confluent_kafka
key = f'{dtype}-{exchange}-{pair}'
data = self._conn(key).consume(1000000, timeout=0.5)

if start and end:
start_offset = self._conn(key).offsets_for_times([kafka.TopicPartition(key, 0, start)])[0]
stop_offset = self._conn(key).offsets_for_times([kafka.TopicPartition(key, 0, end)])[0]
if start_offset.offset == -1:
return []

self._conn(key).assign([start_offset])
offset_diff = stop_offset.offset - start_offset.offset
if offset_diff <= 0:
return []

data = self._conn(key).consume(offset_diff)
self._conn(key).unassign()
else:
data = self._conn(key).consume(1000000, timeout=0.5)

LOG.info("%s: Read %d messages from Kafka", key, len(data))
ret = []

for message in data:
self.ids[key] = message
update = json.loads(message.value().decode('utf8'))

msg = message.value().decode('utf8')
try:
update = json.loads(msg)
except:
if 'Subscribed topic not available' in msg:
return ret
if dtype in {L2_BOOK, L3_BOOK}:
update = book_flatten(update, update['timestamp'], update['delta'])
ret.extend(update)
if dtype in {TRADES, TICKER, FUNDING, OPEN_INTEREST}:
ret.append(update)

return ret

def delete(self, exchange, dtype, pair):
Expand Down
1 change: 0 additions & 1 deletion cryptostore/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def write(self, exchange, data_type, pair, timestamp):
elif var == "pair":
file_name += f"{pair}-"
else:
print(var)
raise ValueError("Invalid file format specified for parquet file")
file_name = file_name[:-1] + ".parquet"
else:
Expand Down

0 comments on commit 47d809e

Please sign in to comment.