Skip to content

Commit

Permalink
fixed a bug to write ds in hdfs. If datapoint has len==1
Browse files Browse the repository at this point in the history
  • Loading branch information
nasirali1 committed Feb 22, 2018
1 parent f8d806c commit b397c61
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions cerebralcortex/core/data_manager/raw/stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import json
import uuid
import pyarrow
import pickle
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import List
Expand Down Expand Up @@ -364,6 +365,14 @@ def write_hdfs_day_file(self, participant_id: uuid, stream_id: uuid, data: DataP
if day is None:
day = row[2]
chunked_data.append(row)
if len(data)==1:
filename = self.raw_files_dir+str(participant_id)+"/"+str(stream_id)+"/"+str(day)+".pickle"
try:
with hdfs.open(filename, "wb") as f:
pickle.dump(chunked_data, f)
except Exception as ex:
self.logging.log(error_message="Error in writing data to HDFS. STREAM ID: " + str(stream_id)+ "Owner ID: " + str(participant_id)+ "Files: " + str(filename)+" - Exception: "+str(ex), error_type=self.logtypes.DEBUG)

elif day!=row[2]:
filename = self.raw_files_dir+str(participant_id)+"/"+str(stream_id)+"/"+str(day)+".pickle"
# if file exist then, retrieve, deserialize, concatenate, serialize again, and store
Expand All @@ -373,13 +382,14 @@ def write_hdfs_day_file(self, participant_id: uuid, stream_id: uuid, data: DataP
if existing_data is not None:
existing_data = serialize_obj(existing_data)
chunked_data.extend(existing_data)
chunked_data = list(set(chunked_data)) # remove duplicate
#chunked_data = list(set(chunked_data)) # remove duplicate
try:
with hdfs.open(filename, "wb") as f:
serialize_obj(chunked_data, f)
pickle.dump(chunked_data, f)
except Exception as ex:
self.logging.log(
error_message="Error in writing data to HDFS. STREAM ID: " + str(stream_id)+ "Owner ID: " + str(participant_id)+ "Files: " + str(filename)+" - Exception: "+str(ex), error_type=self.logtypes.DEBUG)

day = row[2]
chunked_data =[]
chunked_data.append(row)
Expand Down

0 comments on commit b397c61

Please sign in to comment.