From 8acb0cb7943e07bd81368b258e16d0ae3d93f173 Mon Sep 17 00:00:00 2001 From: nasirali1 Date: Thu, 22 Feb 2018 14:15:59 -0600 Subject: [PATCH] fixed a bug to write ds in hdfs. If datapoint has len==1 append file fixed --- cerebralcortex/core/data_manager/raw/stream_handler.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cerebralcortex/core/data_manager/raw/stream_handler.py b/cerebralcortex/core/data_manager/raw/stream_handler.py index 154c41d..028ac66 100644 --- a/cerebralcortex/core/data_manager/raw/stream_handler.py +++ b/cerebralcortex/core/data_manager/raw/stream_handler.py @@ -368,6 +368,12 @@ def write_hdfs_day_file(self, participant_id: uuid, stream_id: uuid, data: DataP if len(data)==1: filename = self.raw_files_dir+str(participant_id)+"/"+str(stream_id)+"/"+str(day)+".pickle" try: + if hdfs.exists(filename): + with hdfs.open(filename, "rb") as curfile: + existing_data = curfile.read() + if existing_data is not None: + existing_data = deserialize_obj(existing_data) + chunked_data.extend(existing_data) with hdfs.open(filename, "wb") as f: pickle.dump(chunked_data, f) except Exception as ex: @@ -380,7 +386,7 @@ def write_hdfs_day_file(self, participant_id: uuid, stream_id: uuid, data: DataP with hdfs.open(filename, "rb") as curfile: existing_data = curfile.read() if existing_data is not None: - existing_data = serialize_obj(existing_data) + existing_data = deserialize_obj(existing_data) chunked_data.extend(existing_data) #chunked_data = list(set(chunked_data)) # remove duplicate try: