From b397c618fd8c08b7152234ef87935b410bf21052 Mon Sep 17 00:00:00 2001 From: nasirali1 Date: Thu, 22 Feb 2018 13:49:11 -0600 Subject: [PATCH] fixed a bug to write ds in hdfs. If datapoint has len==1 --- .../core/data_manager/raw/stream_handler.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/cerebralcortex/core/data_manager/raw/stream_handler.py b/cerebralcortex/core/data_manager/raw/stream_handler.py index 0160bc0..154c41d 100644 --- a/cerebralcortex/core/data_manager/raw/stream_handler.py +++ b/cerebralcortex/core/data_manager/raw/stream_handler.py @@ -26,6 +26,7 @@ import json import uuid import pyarrow +import pickle from datetime import datetime, timedelta, timezone from enum import Enum from typing import List @@ -364,6 +365,14 @@ def write_hdfs_day_file(self, participant_id: uuid, stream_id: uuid, data: DataP if day is None: day = row[2] chunked_data.append(row) + if len(data)==1: + filename = self.raw_files_dir+str(participant_id)+"/"+str(stream_id)+"/"+str(day)+".pickle" + try: + with hdfs.open(filename, "wb") as f: + pickle.dump(chunked_data, f) + except Exception as ex: + self.logging.log(error_message="Error in writing data to HDFS. STREAM ID: " + str(stream_id)+ "Owner ID: " + str(participant_id)+ "Files: " + str(filename)+" - Exception: "+str(ex), error_type=self.logtypes.DEBUG) + elif day!=row[2]: filename = self.raw_files_dir+str(participant_id)+"/"+str(stream_id)+"/"+str(day)+".pickle" # if file exist then, retrieve, deserialize, concatenate, serialize again, and store @@ -373,13 +382,14 @@ def write_hdfs_day_file(self, participant_id: uuid, stream_id: uuid, data: DataP if existing_data is not None: existing_data = serialize_obj(existing_data) chunked_data.extend(existing_data) - chunked_data = list(set(chunked_data)) # remove duplicate + #chunked_data = list(set(chunked_data)) # remove duplicate try: with hdfs.open(filename, "wb") as f: - serialize_obj(chunked_data, f) + pickle.dump(chunked_data, f) except Exception as ex: self.logging.log( error_message="Error in writing data to HDFS. STREAM ID: " + str(stream_id)+ "Owner ID: " + str(participant_id)+ "Files: " + str(filename)+" - Exception: "+str(ex), error_type=self.logtypes.DEBUG) + day = row[2] chunked_data =[] chunked_data.append(row)