diff --git a/cerebralcortex/algorithms/__init__.py b/cerebralcortex/algorithms/__init__.py index 59e2ddb..2767757 100644 --- a/cerebralcortex/algorithms/__init__.py +++ b/cerebralcortex/algorithms/__init__.py @@ -1,2 +1,8 @@ from cerebralcortex.algorithms.gps import gps_clusters -__all__ = ["gps_clusters"] \ No newline at end of file +from cerebralcortex.algorithms.ecg.ecg_signal_processing import process_ecg +from cerebralcortex.algorithms.stress_prediction.stress_prediction import stress_prediction +from cerebralcortex.algorithms.stress_prediction.stress_episodes import stress_episodes_estimation +from cerebralcortex.algorithms.rr_intervals.rr_interval_feature_extraction import rr_interval_feature_extraction + +__all__ = ["gps_clusters","process_ecg", "rr_interval_feature_extraction", "stress_prediction", "stress_episodes_estimation"] + diff --git a/cerebralcortex/algorithms/ecg/__init__.py b/cerebralcortex/algorithms/ecg/__init__.py new file mode 100644 index 0000000..a5d3d18 --- /dev/null +++ b/cerebralcortex/algorithms/ecg/__init__.py @@ -0,0 +1,2 @@ +from cerebralcortex.algorithms.ecg.ecg_signal_processing import process_ecg +__all__ = ["process_ecg"] diff --git a/cerebralcortex/algorithms/ecg/ecg_signal_processing.py b/cerebralcortex/algorithms/ecg/ecg_signal_processing.py new file mode 100644 index 0000000..2fa9218 --- /dev/null +++ b/cerebralcortex/algorithms/ecg/ecg_signal_processing.py @@ -0,0 +1,460 @@ +# Copyright (c) 2017, MD2K Center of Excellence +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from typing import List +import numpy as np +from scipy import signal +from scipy.stats import iqr +from enum import Enum +from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType +from pyspark.sql.functions import pandas_udf, PandasUDFType +import pandas as pd + +''' +from core.signalprocessing.dataquality import Quality +from cerebralcortex.core.datatypes.datapoint import DataPoint +from cerebralcortex.core.datatypes.datastream import DataStream +''' + +def rr_interval_update(rpeak_temp1, + rr_ave: float, + min_size: int = 8) -> float: + """ + :param min_size: 8 last R-peaks are checked to compute the running rr interval average + :param rpeak_temp1: R peak locations + :param rr_ave: previous rr-interval average + :return: the new rr-interval average of the previously detected 8 R peak locations + """ + peak_interval = np.diff([0] + rpeak_temp1) # TODO: rpeak_temp1 is a datapoint, what should this be converted to? + return rr_ave if len(peak_interval) < min_size else np.sum(peak_interval[-min_size:]) / min_size + + +def compute_moving_window_int(sample: np.ndarray, + fs: float, + blackman_win_length: int, + filter_length: int = 257, + delta: float = .02) -> np.ndarray: + """ + :param sample: ecg sample array + :param fs: sampling frequency + :param blackman_win_length: length of the blackman window on which to compute the moving window integration + :param filter_length: length of the FIR bandpass filter on which filtering is done on ecg sample array + :param delta: to compute the weights of each band in FIR filter + :return: the Moving window integration of the sample array + """ + # I believe these constants can be kept in a file + + # filter edges + filter_edges = [0, 4.5 * 2 / fs, 5 * 2 / fs, 20 * 2 / fs, 20.5 * 2 / fs, 1] + # gains at filter band edges + gains = [0, 0, 1, 1, 0, 0] + # weights + weights = [500 / delta, 1 / delta, 500 / delta] + # length of the FIR filter + + # FIR filter coefficients for bandpass filtering + filter_coeff = signal.firls(filter_length, filter_edges, gains, weights) + + # bandpass filtered signal + bandpass_signal = signal.convolve(sample, filter_coeff, 'same') + bandpass_signal /= np.percentile(bandpass_signal, 90) + + # derivative array + derivative_array = (np.array([-1.0, -2.0, 0, 2.0, 1.0])) * (1 / 8) + # derivative signal (differentiation of the bandpass) + derivative_signal = signal.convolve(bandpass_signal, derivative_array, 'same') + derivative_signal /= np.percentile(derivative_signal, 90) + + # squared derivative signal + derivative_squared_signal = derivative_signal ** 2 + derivative_squared_signal /= np.percentile(derivative_squared_signal, 90) + + # blackman window + blackman_window = np.blackman(blackman_win_length) + # moving window Integration of squared derivative signal + mov_win_int_signal = signal.convolve(derivative_squared_signal, blackman_window, 'same') + mov_win_int_signal /= np.percentile(mov_win_int_signal, 90) + + return mov_win_int_signal + + +def check_peak(data) -> bool: + """ + This is a function to check the condition of a simple peak of signal y in index i + :param data: + :return: + """ + + if len(data) < 3: + return False + + midpoint = int(len(data) / 2) + test_value = data[0] + + for i in data[1:midpoint + 1]: + if test_value < i: + test_value = i + else: + return False + + for i in data[midpoint + 1:]: + if test_value > i: + test_value = i + else: + return False + + return True + + +# TODO: CODE_REVIEW: Justify in the method documentation string the justification of the default values +# TODO: CODE_REVIEW: Make hard-coded constants default method parameter +def compute_r_peaks(threshold_1: float, + rr_ave: float, + mov_win_int_signal: np.ndarray, + peak_tuple_array: List[tuple]) -> list: + """ + This function does the adaptive thresholding of the signal to get the R-peak locations + + + :param threshold_1: Thr1 is the threshold above which the R peak + :param rr_ave: running RR-interval average + :param mov_win_int_signal: signal sample array + :param peak_tuple_array: A tuple array containing location and values of the simple peaks detected in the process before + + :returns rpeak_array_indices: The location of the R peaks in the signal sample array once found this is returned + + + """ + + peak_location_in_signal_array = [i[0] for i in peak_tuple_array] # location of the simple peaks in signal array + amplitude_in_peak_locations = [i[1] for i in peak_tuple_array] # simple peak's amplitude in signal array + + threshold_2 = 0.5 * threshold_1 # any signal value between threshold_2 and threshold_1 is a noise peak + sig_lev = 4 * threshold_1 # current signal level -any signal above thrice the signal level is discarded as a spurious value + noise_lev = 0.1 * sig_lev # current noise level of the signal + ind_rpeak = 0 + rpeak_array_indices = [] + rpeak_inds_in_peak_array = [] + while ind_rpeak < len(peak_location_in_signal_array): + + # if for 166 percent of the present RR interval no peak is detected as R peak then threshold_2 is taken as the + # R peak threshold and the maximum of the range is taken as a R peak and RR interval is updated accordingly + if len(rpeak_array_indices) >= 1 and peak_location_in_signal_array[ind_rpeak] - peak_location_in_signal_array[ + rpeak_inds_in_peak_array[-1]] > 1.66 * rr_ave and ind_rpeak - rpeak_inds_in_peak_array[-1] > 1: + + # values and indexes of previous peaks discarded as not an R peak whose magnitude is above threshold_2 + searchback_array = [(k - rpeak_inds_in_peak_array[-1], amplitude_in_peak_locations[k]) for k in + range(rpeak_inds_in_peak_array[-1] + 1, ind_rpeak) if + 3 * sig_lev > amplitude_in_peak_locations[k] > threshold_2] + + if len(searchback_array) > 0: + # maximum inside the range calculated beforehand is taken as R peak + searchback_array_inrange_values = [x[1] for x in searchback_array] + searchback_max_index = np.argmax(searchback_array_inrange_values) + rpeak_array_indices.append(peak_location_in_signal_array[ + rpeak_inds_in_peak_array[-1] + searchback_array[searchback_max_index][ + 0]]) + rpeak_inds_in_peak_array.append( + rpeak_inds_in_peak_array[-1] + searchback_array[searchback_max_index][0]) + sig_lev = ewma(sig_lev, mov_win_int_signal[peak_location_in_signal_array[ind_rpeak]], + .125) # update the current signal level + threshold_1 = noise_lev + 0.25 * (sig_lev - noise_lev) + threshold_2 = 0.5 * threshold_1 + rr_ave = rr_interval_update(rpeak_array_indices, rr_ave) + ind_rpeak = rpeak_inds_in_peak_array[-1] + 1 + else: + threshold_1 = noise_lev + 0.25 * (sig_lev - noise_lev) + threshold_2 = 0.5 * threshold_1 + ind_rpeak += 1 + else: + # R peak checking + if threshold_1 <= mov_win_int_signal[peak_location_in_signal_array[ind_rpeak]] < 3 * sig_lev: + rpeak_array_indices.append(peak_location_in_signal_array[ind_rpeak]) + rpeak_inds_in_peak_array.append(ind_rpeak) + sig_lev = ewma(sig_lev, mov_win_int_signal[peak_location_in_signal_array[ind_rpeak]], + .125) # update the signal level + # noise peak checking + elif threshold_1 > mov_win_int_signal[peak_location_in_signal_array[ind_rpeak]] > threshold_2: + noise_lev = ewma(noise_lev, mov_win_int_signal[peak_location_in_signal_array[ind_rpeak]], + .125) # update the noise level + threshold_1 = noise_lev + 0.25 * (sig_lev - noise_lev) + threshold_2 = 0.5 * threshold_1 + ind_rpeak += 1 + rr_ave = rr_interval_update(rpeak_array_indices, rr_ave) + return rpeak_array_indices + + +def ewma(value: float, new_value: float, alpha: float) -> float: + """ + + :param value: + :param new_value: + :param alpha: + :return: + """ + return alpha * new_value + (1 - alpha) * value + + +# TODO: CODE_REVIEW: Justify in the method documentation string the justification of the default values +# TODO: CODE_REVIEW: Make hard-coded constants default method parameter +def remove_close_peaks(rpeak_temp1: list, + sample: np.ndarray, + fs: float, + min_range: float = .5) -> list: + """ + This function removes one of two peaks from two consecutive R peaks + if difference among them is less than the minimum possible + + :param min_range: + :param rpeak_temp1: R peak array containing the index of the R peaks + :param sample: sample array + :param fs: sampling frequency + :return: R peak array with no close R peaks + + """ + difference = 0 + rpeak_temp2 = rpeak_temp1 + while difference != 1: + length_rpeak_temp2 = len(rpeak_temp2) + temp = np.diff(rpeak_temp2) + comp_index1 = [rpeak_temp2[i] for i in range(len(temp)) if temp[i] < min_range * fs] + comp_index2 = [rpeak_temp2[i + 1] for i in range(len(temp)) if temp[i] < min_range * fs] + comp1 = sample[comp_index1] + comp2 = sample[comp_index2] + checkmin = np.matrix([comp1, comp2]) + temp_ind1 = [i for i in range(len(temp)) if temp[i] < min_range * fs] + temp_ind2 = np.argmin(np.array(checkmin), axis=0) + temp_ind = temp_ind1 + temp_ind2 + temp_ind = np.unique(temp_ind) + count = 0 + for i in temp_ind: + rpeak_temp2.remove(rpeak_temp2[i - count]) + count = count + 1 + difference = length_rpeak_temp2 - len(rpeak_temp2) + 1 + return rpeak_temp2 + + +def confirm_peaks(rpeak_temp1: list, + sample: np.ndarray, + fs: float, + range_for_checking: float = 1 / 10) -> np.ndarray: + """ + + This function does the final check on the R peaks detected and + finds the maximum in a range of fs/10 of the detected peak location and assigns it to be the peak + + :param rpeak_temp1: R peak array containing the index of the R peaks + :param sample: sample array + :param fs: sampling frequency + :param range_for_checking : The peaks are checked within a range of fs/10 to get the maximum value within that range + + :return: final R peak array + + """ + for i in range(1, len(rpeak_temp1) - 1): + start_index = int(rpeak_temp1[i] - np.ceil(range_for_checking * fs)) + end_index = int(rpeak_temp1[i] + np.ceil(range_for_checking * fs) + 1) + + index = np.argmax(sample[start_index:end_index]) + + rpeak_temp1[i] = rpeak_temp1[i] - np.ceil(range_for_checking * fs) + index + + return np.array(rpeak_temp1).astype(np.int64) + + +# TODO: CODE_REVIEW: Make hard-coded constants default method parameter +def detect_rpeak(ecg , + fs: float = 64, + threshold: float = 0.5, + blackman_win_len_range: float = 0.2): + """ + This program implements the Pan Tomkins algorithm on ECG signal to detect the R peaks + + Since the ecg array can have discontinuity in the timestamp arrays the rr-interval calculated + in the algorithm is calculated in terms of the index in the sample array + + The algorithm consists of some major steps + + 1. computation of the moving window integration of the signal in terms of blackman window of a prescribed length + 2. compute all the peaks of the moving window integration signal + 3. adaptive thresholding with dynamic signal and noise thresholds applied to filter out the R peak locations + 4. confirm the R peaks through differentiation from the nearby peaks and remove the false peaks + + :param ecg: ecg array of tuples (timestamp,value) + :param fs: sampling frequency + :param threshold: initial threshold to detect the R peak in a signal normalized by the 90th percentile. .5 is default. + :param blackman_win_len_range : the range to calculate blackman window length + + :return: R peak array of tuples (timestamp, Rpeak interval) + """ + + ecg_vals = ecg['ecg'].values + ecg_timestamps = ecg['timestamp'].values + + sample = np.array([i for i in ecg_vals]) + timestamp = np.array([i for i in ecg_timestamps]) + + # computes the moving window integration of the signal + blackman_win_len = np.ceil(fs * blackman_win_len_range) + y = compute_moving_window_int(sample, fs, blackman_win_len) + + peak_location_values = [(i, y[i]) for i in range(2, len(y) - 1) if check_peak(y[i - 2:i + 3])] + + # initial RR interval average + peak_location = [i[0] for i in peak_location_values] + running_rr_avg = sum(np.diff(peak_location)) / (len(peak_location) - 1) + + rpeak_temp1 = compute_r_peaks(threshold, running_rr_avg, y, peak_location_values) + rpeak_temp2 = remove_close_peaks(rpeak_temp1, sample, fs) + index = confirm_peaks(rpeak_temp2, sample, fs) + + rpeak_timestamp = timestamp[index] + rpeak_value = np.diff(rpeak_timestamp) + rpeak_timestamp = rpeak_timestamp[1:] + + result_data = [] + for k in range(len(rpeak_value)): + result_data.append( + (rpeak_timestamp[k], rpeak_value[k]/np.timedelta64(1, 's') + rpeak_value[k]/np.timedelta64(1, 'us') / 1e6)) + + # Create resulting datastream to be returned + + filtered_data = compute_outlier_ecg(result_data) + + return filtered_data + + +class Quality(Enum): + ACCEPTABLE = 1 + UNACCEPTABLE = 0 + +def outlier_computation(valid_rr_interval_time: list, + valid_rr_interval_sample: list, + criterion_beat_difference: float): + """ + This function implements the rr interval outlier calculation through comparison with the criterion + beat difference and consecutive differences with the previous and next sample + + :param valid_rr_interval_time: A python array of rr interval time + :param valid_rr_interval_sample: A python array of rr interval samples + :param criterion_beat_difference: A threshold calculated from the RR interval data passed + + yields: The quality of each data point in the RR interval array + """ + standard_rr_interval_sample = valid_rr_interval_sample[0] + previous_rr_interval_quality = Quality.ACCEPTABLE + + for i in range(1, len(valid_rr_interval_sample) - 1): + + rr_interval_diff_with_last_good = abs(standard_rr_interval_sample - valid_rr_interval_sample[i]) + rr_interval_diff_with_prev_sample = abs(valid_rr_interval_sample[i - 1] - valid_rr_interval_sample[i]) + rr_interval_diff_with_next_sample = abs(valid_rr_interval_sample[i] - valid_rr_interval_sample[i + 1]) + + if previous_rr_interval_quality == Quality.UNACCEPTABLE and rr_interval_diff_with_last_good < criterion_beat_difference: + yield (valid_rr_interval_time[i], Quality.ACCEPTABLE) + previous_rr_interval_quality = Quality.ACCEPTABLE + standard_rr_interval_sample = valid_rr_interval_sample[i] + + elif previous_rr_interval_quality == Quality.UNACCEPTABLE and rr_interval_diff_with_last_good > criterion_beat_difference >= rr_interval_diff_with_prev_sample and rr_interval_diff_with_next_sample <= criterion_beat_difference: + yield (valid_rr_interval_time[i], Quality.ACCEPTABLE) + previous_rr_interval_quality = Quality.ACCEPTABLE + standard_rr_interval_sample = valid_rr_interval_sample[i] + + elif previous_rr_interval_quality == Quality.UNACCEPTABLE and rr_interval_diff_with_last_good > criterion_beat_difference and ( + rr_interval_diff_with_prev_sample > criterion_beat_difference or rr_interval_diff_with_next_sample > criterion_beat_difference): + yield (valid_rr_interval_time[i], Quality.UNACCEPTABLE) + previous_rr_interval_quality = Quality.UNACCEPTABLE + + elif previous_rr_interval_quality == Quality.ACCEPTABLE and rr_interval_diff_with_prev_sample <= criterion_beat_difference: + yield (valid_rr_interval_time[i], Quality.ACCEPTABLE) + previous_rr_interval_quality = Quality.ACCEPTABLE + standard_rr_interval_sample = valid_rr_interval_sample[i] + + elif previous_rr_interval_quality == Quality.ACCEPTABLE and rr_interval_diff_with_prev_sample > criterion_beat_difference: + yield (valid_rr_interval_time[i], Quality.UNACCEPTABLE) + previous_rr_interval_quality = Quality.UNACCEPTABLE + + else: + yield (valid_rr_interval_time[i], Quality.UNACCEPTABLE) + + +def compute_outlier_ecg(rr_intervals): + """ + Reference - Berntson, Gary G., et al. "An approach to artifact identification: Application to heart period data." + Psychophysiology 27.5 (1990): 586-598. + + :param ecg_rr: RR interval datastream + + :return: An annotated datastream specifying when the ECG RR interval datastream is acceptable + """ + + # print(1) + + valid_rr_interval_sample = [i[1] for i in rr_intervals if i[1] > .3 and i[1] < 2] + valid_rr_interval_time = [i[0] for i in rr_intervals if i[1] > .3 and i[1] < 2] + valid_rr_interval_difference = abs(np.diff(valid_rr_interval_sample)) + # plt.plot(valid_rr_interval_time,valid_rr_interval_sample) + # plt.show() + # Maximum Expected Difference(MED)= 3.32* Quartile Deviation + maximum_expected_difference = 4.5 * 0.5 * iqr(valid_rr_interval_difference) + + # Shortest Expected Beat(SEB) = Median Beat – 2.9 * Quartile Deviation + # Minimal Artifact Difference(MAD) = SEB/ 3 + maximum_artifact_difference = (np.median(valid_rr_interval_sample) - 2.9 * .5 * iqr( + valid_rr_interval_difference)) / 3 + + # Midway between MED and MAD is considered + criterion_beat_difference = (maximum_expected_difference + maximum_artifact_difference) / 2 + if criterion_beat_difference < .2: + criterion_beat_difference = .2 + + ecg_rr_quality_array = [(valid_rr_interval_time[0], valid_rr_interval_sample[0])] + + count = 1 + for data in outlier_computation(valid_rr_interval_time, valid_rr_interval_sample, criterion_beat_difference): + if(data[1] == Quality.ACCEPTABLE): + ecg_rr_quality_array.append((valid_rr_interval_time[count], valid_rr_interval_sample[count])) + count += 1 + + ecg_rr_quality_array.append((valid_rr_interval_time[-1], valid_rr_interval_sample[-1])) + + #TODO: check above filtering + + return ecg_rr_quality_array + + + +schema = StructType([ + StructField("user", StringType()), + StructField("timestamp", TimestampType()), + StructField("rr_interval", FloatType()), +]) + + +@pandas_udf(schema, PandasUDFType.GROUPED_MAP) +def process_ecg(data: object) -> object: + rri = detect_rpeak(data, 100) + df = pd.DataFrame(rri, columns=['timestamp', 'rr_interval']) + df.insert(0,'user',data['user'].values[0]) + return df + diff --git a/cerebralcortex/algorithms/gps/gps_clustering.py b/cerebralcortex/algorithms/gps/gps_clustering.py index 864bbdb..ddc1d5b 100644 --- a/cerebralcortex/algorithms/gps/gps_clustering.py +++ b/cerebralcortex/algorithms/gps/gps_clustering.py @@ -32,14 +32,14 @@ from pyspark.sql.types import StructField, StructType, StringType, FloatType -EPSILON_CONSTANT = 1000 +EPSILON_CONSTANT = 1000/100.0 LATITUDE = 0 LONGITUDE = 1 ACCURACY = -1 GPS_ACCURACY_THRESHOLD = 41.0 KM_PER_RADIAN = 6371.0088 GEO_FENCE_DISTANCE = 2 -MINIMUM_POINTS_IN_CLUSTER = 500 +MINIMUM_POINTS_IN_CLUSTER = 50 def get_centermost_point(cluster: object) -> object: """ diff --git a/cerebralcortex/algorithms/rr_intervals/__init__.py b/cerebralcortex/algorithms/rr_intervals/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cerebralcortex/algorithms/rr_intervals/rr_interval_feature_extraction.py b/cerebralcortex/algorithms/rr_intervals/rr_interval_feature_extraction.py new file mode 100644 index 0000000..b369b4c --- /dev/null +++ b/cerebralcortex/algorithms/rr_intervals/rr_interval_feature_extraction.py @@ -0,0 +1,218 @@ +# Copyright (c) 2017, MD2K Center of Excellence +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from typing import List +import numpy as np +from scipy import signal +from scipy.stats import iqr +from scipy.stats.mstats_basic import winsorize +from enum import Enum +from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, ArrayType +from pyspark.sql.functions import pandas_udf, PandasUDFType +import pandas as pd +import datetime + +def lomb(time_stamps:List, + samples:List, + low_frequency: float, + high_frequency: float): + """ + : Lomb–Scargle periodogram implementation + :param data: List[DataPoint] + :param high_frequency: float + :param low_frequency: float + :return lomb-scargle pgram and frequency values + """ + + frequency_range = np.linspace(low_frequency, high_frequency, len(time_stamps)) + result = signal.lombscargle(time_stamps, samples, frequency_range) + return result, frequency_range + + +def heart_rate_power(power: np.ndarray, + frequency: np.ndarray, + low_rate: float, + high_rate: float): + """ + Compute Heart Rate Power for specific frequency range + :param power: np.ndarray + :param frequency: np.ndarray + :param high_rate: float + :param low_rate: float + :return: sum of power for the frequency range + """ + result_power = float(0.0) + for i, value in enumerate(power): + if low_rate <= frequency[i] <= high_rate: + result_power += value + return result_power + + + +def rr_feature_computation(timestamp:list, + value:list, + low_frequency: float = 0.01, + high_frequency: float = 0.7, + low_rate_vlf: float = 0.0009, + high_rate_vlf: float = 0.04, + low_rate_hf: float = 0.15, + high_rate_hf: float = 0.4, + low_rate_lf: float = 0.04, + high_rate_lf: float = 0.15): + """ + ECG Feature Implementation. The frequency ranges for High, Low and Very low heart rate variability values are + derived from the following paper: + 'Heart rate variability: standards of measurement, physiological interpretation and clinical use' + :param high_rate_lf: float + :param low_rate_lf: float + :param high_rate_hf: float + :param low_rate_hf: float + :param high_rate_vlf: float + :param low_rate_vlf: float + :param high_frequency: float + :param low_frequency: float + :param datastream: DataStream + :param window_size: float + :param window_offset: float + :return: ECG Feature DataStreams + """ + + + # perform windowing of datastream + + + # initialize each ecg feature array + + rr_variance_data = [] + rr_mean_data = [] + rr_median_data = [] + rr_80percentile_data = [] + rr_20percentile_data = [] + rr_quartile_deviation_data = [] + rr_HF_data = [] + rr_LF_data = [] + rr_VLF_data = [] + rr_LF_HF_data = [] + rr_heart_rate_data = [] + + # iterate over each window and calculate features + + + reference_data = value + + rr_variance_data.append(np.var(reference_data)) + + power, frequency = lomb(time_stamps=timestamp,samples=value,low_frequency=low_frequency, high_frequency=high_frequency) + + rr_VLF_data.append(heart_rate_power(power, frequency, low_rate_vlf, high_rate_vlf)) + + rr_HF_data.append(heart_rate_power(power, frequency, low_rate_hf, high_rate_hf)) + + rr_LF_data.append(heart_rate_power(power,frequency,low_rate_lf,high_rate_lf)) + + if heart_rate_power(power, frequency, low_rate_hf, high_rate_hf) != 0: + lf_hf = float(heart_rate_power(power, frequency, low_rate_lf, high_rate_lf) / heart_rate_power(power, + frequency, + low_rate_hf, + high_rate_hf)) + rr_LF_HF_data.append(lf_hf) + else: + rr_LF_HF_data.append(0) + + rr_mean_data.append(np.mean(reference_data)) + rr_median_data.append(np.median(reference_data)) + rr_quartile_deviation_data.append((0.5*(np.percentile(reference_data, 75) - np.percentile(reference_data,25)))) + rr_heart_rate_data.append(np.median(60000/reference_data)) + + return [rr_variance_data[0], rr_VLF_data[0], rr_HF_data[0], rr_LF_data[0], rr_LF_HF_data[0],\ + rr_mean_data[0], rr_median_data[0], rr_quartile_deviation_data[0], rr_heart_rate_data[0],\ + np.percentile(value,80),np.percentile(value,20)] + + +def get_windows(data): + window_col,ts_col = [],[] + rr_interval = data.sort_values(by=['timestamp']) + st = (rr_interval['timestamp'].values[0].astype('int64')/1e9)*1000 + et = (rr_interval['timestamp'].values[-1].astype('int64')/1e9)*1000 + ts_array = np.arange(st,et,60000) + + x = [(i.astype('int64')/1e9)*1000 for i in rr_interval['timestamp'].values] + y = [i for i in rr_interval['rr_interval'].values] + #tmp_ts = np.array(x) + #tmp_rri = np.array(y) + + tmp_rri = np.zeros((len(x),2)) + for c in range(len(x)): + tmp_rri[c][0] = x[c] + tmp_rri[c][1] = y[c] + + for t in ts_array: + #index = np.where((tmp_ts >= t) & (tmp_ts <= t+60000))[0] + index = np.where((tmp_rri[:,0]>=t)&(tmp_rri[:,0]<=t+60000))[0] + + if len(index)>100 or len(index)<20: + continue + + window_col.append(tmp_rri[index,:]) + #window_col.append([[tmp_ts[i], tmp_rri[i]] for i in index]) + ts_col.append(t) + return window_col,ts_col + +def combine_data(window_col): + feature_matrix = np.zeros((0,11)) + for i,item in enumerate(window_col): + feature = rr_feature_computation(item[:,0],item[:,1]) + feature_matrix = np.concatenate((feature_matrix,np.array(feature).reshape(-1,11))) + return feature_matrix + + + +schema = StructType([ + StructField("user", StringType()), + StructField("timestamp", TimestampType()), + StructField("rr_feature", ArrayType(FloatType())), +]) + + +@pandas_udf(schema, PandasUDFType.GROUPED_MAP) +def rr_interval_feature_extraction(data: object) -> object: + winsor_limit = 0.1 #FIXME - this must be passed or configurable + + mean = data['rr_interval'].mean() + std = data['rr_interval'].std() + + data['rr_interval'] = (data['rr_interval'] - mean)/std + + window_col, ts_col = get_windows(data) + X = combine_data(window_col) + for k in range(X.shape[1]): + X[:,k] = winsorize(X[:,k],limits=[winsor_limit,winsor_limit]) + + + df = pd.DataFrame(index = np.arange(0, len(ts_col)), columns=['user', 'timestamp', 'rr_feature']) + user = data['user'].values[0] + for c in range(len(ts_col)): + df.loc[c] = [user, np.datetime64(int(ts_col[c]), 'ms'), X[c]] + + return df + diff --git a/cerebralcortex/algorithms/stress_prediction/__init__.py b/cerebralcortex/algorithms/stress_prediction/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cerebralcortex/algorithms/stress_prediction/stress_episodes.py b/cerebralcortex/algorithms/stress_prediction/stress_episodes.py new file mode 100644 index 0000000..25ababf --- /dev/null +++ b/cerebralcortex/algorithms/stress_prediction/stress_episodes.py @@ -0,0 +1,258 @@ +# Copyright (c) 2017, MD2K Center of Excellence +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import List +import numpy as np +from scipy import signal +from scipy.stats import iqr +from scipy.stats.mstats_basic import winsorize +from enum import Enum +from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, ArrayType +from pyspark.sql.functions import pandas_udf, PandasUDFType +import pandas as pd +import datetime + +NOTSTRESS = "NOTSTRESS" +UNSURE = 'UNSURE' +YESSTRESS = 'YESSTRESS' +UNKNOWN = 'UNKNOWN' +NOTCLASSIFIED = 'NOTCLASSIFIED' + +schema = StructType([ + StructField("user", StringType()), + StructField("timestamp", TimestampType()), + StructField("stress_episode", StringType()), +]) + +@pandas_udf(schema, PandasUDFType.GROUPED_MAP) +def stress_episodes_estimation(stress_data: object) -> object: + # --- Constants definitions --- + smoothing_window = 3 # FIXME - 3 minutes + macd_param_fast = 7; + macd_param_slow = 19; + macd_param_signal = 2; + threshold_yes = 0.36; + threshold_no = 0.36; + + data = impute(stress_data) + + # Smooth the stress values + stress_smoothed_list = [] + + for c in range(2,len(data['stress_probability'].values)): + smoothed_stress = (data['stress_probability'].values[c] + \ + data['stress_probability'].values[c-1] + \ + data['stress_probability'].values[c-2]) / smoothing_window + stress_smoothed_list.append((data['timestamp'].values[c], smoothed_stress)) + + ema_fast_list = [] + ema_fast_list.append(stress_smoothed_list[0]) + ema_slow_list = [] + ema_slow_list.append(stress_smoothed_list[0]) + ema_signal_list = [] + ema_signal_list.append((0,0)) + histogram_list = [] + + stress_episode_start = [] + stress_episode_peak = [] + stress_episode_classification = [] + stress_episode_intervals = [] + + for c in range(len(stress_smoothed_list)): + ema_fast_prev = ema_fast_list[-1][1] + ema_fast_current = stress_smoothed_list[c][1] + ema_fast = ewma(ema_fast_current, ema_fast_prev, 2.0/(macd_param_fast + 1)) + ema_fast_list.append((stress_smoothed_list[c][0], ema_fast)) + + ema_slow_prev = ema_slow_list[-1][1] + ema_slow_current = stress_smoothed_list[c][1] + ema_slow = ewma(ema_slow_current, ema_slow_prev, 2.0/(macd_param_slow + 1)) + ema_slow_list.append((stress_smoothed_list[c][0], ema_slow)) + + macd_prev = ema_fast_prev - ema_slow_prev + macd_current = ema_fast_current - ema_slow_current + ema_signal_prev = ema_signal_list[-1][1] + ema_signal = ewma(macd_current, macd_prev, 2.0/(macd_param_signal + 1)) + ema_signal_list.append((stress_smoothed_list[c][0], ema_signal)) + + histogram_prev = macd_prev - ema_signal_prev + histogram = macd_current - ema_signal + histogram_list.append((stress_smoothed_list[c][0], histogram)) + + if histogram_prev <=0 and histogram > 0: + # Episode has ended, started increasing again + start_timestamp = -1; + peak_timestamp = -1; + end_timestamp = stress_smoothed_list[c][0] + stress_class = -1 + if len(stress_episode_start): + start_timestamp = stress_episode_start[-1][0] + + if len(stress_episode_classification): + peak_timestamp = stress_episode_classification[-1][0] + stress_class = stress_episode_classification[-1][1] + + if stress_class != -1: + #print('Found full stress episode', stress_class) + #TODO - Handle this????? + stress_episode_timestamps = [] + stress_episode_timestamps.append(start_timestamp) + stress_episode_timestamps.append(peak_timestamp) + stress_episode_timestamps.append(end_timestamp) + stress_episode_timestamps.append(stress_class) + stress_episode_intervals.append(stress_episode_timestamps) + + if histogram_prev >=0 and histogram < 0: + # Episode is in the middle, started decreasing + episode_start_timestamp = get_episode_start_timestamp(stress_episode_classification, histogram_list, stress_smoothed_list[c][0]) + if episode_start_timestamp == -1: + stress_episode_start.append((episode_start_timestamp, NOTCLASSIFIED)) + stress_episode_peak.append((stress_smoothed_list[c][0], NOTCLASSIFIED)) + stress_episode_classification.append((stress_smoothed_list[c][0], NOTCLASSIFIED)) + else: + proportion_available = get_proportion_available(data, episode_start_timestamp, stress_smoothed_list[c][0]) + if proportion_available < 0.5: + stress_episode_start.append((episode_start_timestamp, UNKNOWN)) + stress_episode_peak.append((stress_smoothed_list[c][0], UNKNOWN)) + stress_episode_classification.append((stress_smoothed_list[c][0], UNKNOWN)) + else: + historical_stress = get_historical_values_timestamp_based(stress_smoothed_list, episode_start_timestamp, stress_smoothed_list[c][0]) + if not len(historical_stress): + stress_episode_start.append((episode_start_timestamp, UNKNOWN)) + stress_episode_peak.append((stress_smoothed_list[c][0], UNKNOWN)) + stress_episode_classification.append((stress_smoothed_list[c][0], UNKNOWN)) + else: + cumu_sum = 0.0 + for hs in historical_stress: + cumu_sum += hs[1] + stress_density = cumu_sum / len(historical_stress) + if stress_density >= threshold_yes: + stress_episode_start.append((episode_start_timestamp, YESSTRESS)) + stress_episode_peak.append((stress_smoothed_list[c][0], YESSTRESS)) + stress_episode_classification.append((stress_smoothed_list[c][0], YESSTRESS)) + elif stress_density <= threshold_no: + stress_episode_start.append((episode_start_timestamp, NOTSTRESS)) + stress_episode_peak.append((stress_smoothed_list[c][0], NOTSTRESS)) + stress_episode_classification.append((stress_smoothed_list[c][0], NOTSTRESS)) + else: + stress_episode_start.append((episode_start_timestamp, UNSURE)) + stress_episode_peak.append((stress_smoothed_list[c][0], UNSURE)) + stress_episode_classification.append((stress_smoothed_list[c][0], UNSURE)) + + + stress_episode_df = pd.DataFrame(index = np.arange(0, len(stress_episode_classification)), columns=['user', 'timestamp', 'stress_episode']) + user = data['user'].values[0] + index = 0 + for c in stress_episode_classification: + ts = c[0] + status = c[1] + stress_episode_df.loc[index] = [user, ts, status] + index += 1 + + return stress_episode_df + +def ewma(x, y, alpha): + return alpha * x + (1 - alpha) * y + +def get_episode_start_timestamp(stress_episode_classification, histogram_list, currenttime): + timestamp_prev = -1 + if len(stress_episode_classification) >= 3: + timestamp_prev = stress_episode_classification[-3][0] + elif len(stress_episode_classification) == 2: + timestamp_prev = stress_episode_classification[-2][0] + elif len(stress_episode_classification) == 1: + timestamp_prev = stress_episode_classification[-1][0] + + histogram_history = get_historical_values_timestamp_based(histogram_list, timestamp_prev, currenttime) + + if len(histogram_history) <= 1: + return -1 + + for x in range(len(histogram_history)-2 , -1, -1): + if histogram_history[x][1] <= 0: + return histogram_history[x+1][0] + + return histogram_history[0][0] + + +def get_historical_values_timestamp_based(data, start_timestamp, currenttime): + toreturn = [] + starttime = start_timestamp + if starttime == -1: + starttime = currenttime - np.timedelta64(100*365*24*3600, 's') # approx 100 year to approximate -1 + for c in data: + if c[0] >= starttime and c[0] <= currenttime: + toreturn.append(c) + if c[0] > currenttime: + break + + return toreturn + + +def get_proportion_available(data, st, current_timestamp): + count = 0 + available = 0 + start_timestamp = st + if start_timestamp == -1: + start_timestamp = current_timestamp - np.timedelta(100, 'Y') + for x in range(len(data)): + row_time = data.iloc[x]['timestamp'] + if row_time >= start_timestamp and row_time <= current_timestamp: + available += data.iloc[x]['available'] + count +=1 + if row_time > current_timestamp: + break + + if count: + return available/count + + return 0 + + +window = 60 # seconds FIXME TODO + +def impute(df): + df['available'] = 1 + missing_vals = pd.DataFrame(columns=df.columns) + + for x in range(1, len(df['timestamp'].values)): + diff = (df['timestamp'].values[x] - df['timestamp'].values[x-1])/np.timedelta64(1, 's')#1000000000 + if diff > 60: + num_rows_to_insert = int(diff/60) - 1 + available_userid = df.iloc[x]['user'] + available_timestamp = df.iloc[x]['timestamp'] + available_stress = df.iloc[x]['stress_probability'] + + for y in range(num_rows_to_insert): + imputed_timestamp = available_timestamp + np.timedelta64((y+1)*window, 's') + new_row = [available_userid, imputed_timestamp, available_stress, 0] + missing_vals.loc[len(missing_vals)] = new_row + + + df_imputed = df.append(missing_vals) + + df_imputed = df_imputed.sort_values(by=['timestamp']) + + return df_imputed + diff --git a/cerebralcortex/algorithms/stress_prediction/stress_prediction.py b/cerebralcortex/algorithms/stress_prediction/stress_prediction.py new file mode 100644 index 0000000..40f9d5c --- /dev/null +++ b/cerebralcortex/algorithms/stress_prediction/stress_prediction.py @@ -0,0 +1,65 @@ +# Copyright (c) 2017, MD2K Center of Excellence +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from typing import List +import numpy as np +from scipy import signal +from scipy.stats import iqr +from scipy.stats.mstats_basic import winsorize +from enum import Enum +from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, ArrayType +from pyspark.sql.functions import pandas_udf, PandasUDFType +import pandas as pd +import datetime +import pickle + + +schema = StructType([ + StructField("user", StringType()), + StructField("timestamp", TimestampType()), + StructField("stress_probability", FloatType()), +]) + + +@pandas_udf(schema, PandasUDFType.GROUPED_MAP) +def stress_prediction(data: object) -> object: + + num_rows = len(data['rr_feature'].values) + + fm = np.zeros((num_rows, 11)) + for c in range(num_rows): + for k in range(11): + fm[c][k] = data['rr_feature'].values[c][k] + + clf_ecg = pickle.load(open('/home/a/stress_classifier/classifier_for_ecg.p','rb')) + predicted = clf_ecg.predict_proba(fm) + + df = pd.DataFrame(index = np.arange(0, len(data['timestamp'].values)), columns=['user', 'timestamp', 'stress_probability']) + user = data['user'].values[0] + for c in range(len(data['timestamp'].values)): + ts = data['timestamp'].values[c] + prob = predicted[c][1] + df.loc[c] = [user, ts, prob] + + return df + diff --git a/cerebralcortex/core/config_manager/config_handler.py b/cerebralcortex/core/config_manager/config_handler.py index 8118b17..8672721 100755 --- a/cerebralcortex/core/config_manager/config_handler.py +++ b/cerebralcortex/core/config_manager/config_handler.py @@ -24,6 +24,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import yaml +import os class ConfigHandler: @@ -49,5 +50,13 @@ def load_file(self, filepath: str): if "object_storage" in self.config and self.config["object_storage"]["object_storage_path"]!="" and self.config["object_storage"]["object_storage_path"][-1] !="/": self.config["object_storage"]["object_storage_path"]+="/" + # if "data_ingestion" in self.config and self.config["data_ingestion"]["data_dir_path"]!="" and self.config["data_ingestion"]["data_dir_path"][-1] !="/": + # self.config["data_ingestion"]["data_dir_path"]+="/" + if "data_dir" in self.config and self.config["data_dir"]!="" and self.config["data_dir"][-1] !="/": self.config["data_dir"]+="/" + + if "log_files_path" in self.config and self.config["cc"]["log_files_path"]!="" and self.config["cc"]["log_files_path"][-1]!="": + self.config["cc"]["log_files_path"] +="/" + if not os.access(self.config["cc"]["log_files_path"], os.W_OK): + raise Exception(self.config["cc"]["log_files_path"]+" path is not writable. Please check your cerebralcortex.yml configurations for 'log_files_path'.") diff --git a/cerebralcortex/core/data_manager/raw/storage_filesystem.py b/cerebralcortex/core/data_manager/raw/storage_filesystem.py index 1998086..05f8a01 100644 --- a/cerebralcortex/core/data_manager/raw/storage_filesystem.py +++ b/cerebralcortex/core/data_manager/raw/storage_filesystem.py @@ -97,7 +97,7 @@ def read_file(self, stream_name:str, version:str="all", user_id:str=None)->objec # df = df.withColumn('version', lit(int(version))) # return df - def write_file(self, stream_name:str, data:DataStream.data) -> bool: + def write_file(self, stream_name:str, data:DataStream.data, file_mode) -> bool: """ Write pyspark DataFrame to a file storage system @@ -113,7 +113,7 @@ def write_file(self, stream_name:str, data:DataStream.data) -> bool: if isinstance(data, pd.DataFrame): return self.write_pandas_dataframe(stream_name, data) else: - return self.write_spark_dataframe(stream_name, data) + return self.write_spark_dataframe(stream_name, data, file_mode) # hdfs_url = self._get_storage_path(stream_name) # try: @@ -122,10 +122,10 @@ def write_file(self, stream_name:str, data:DataStream.data) -> bool: # except Exception as e: # raise Exception("Cannot store dataframe: "+str(e)) - def write_spark_dataframe(self, stream_name, data): + def write_spark_dataframe(self, stream_name, data,file_mode): hdfs_url = self._get_storage_path(stream_name) try: - data.write.partitionBy(["version","user"]).format('parquet').mode('overwrite').save(hdfs_url) + data.write.partitionBy(["version","user"]).format('parquet').mode(file_mode).save(hdfs_url) return True except Exception as e: raise Exception("Cannot store dataframe: "+str(e)) diff --git a/cerebralcortex/core/data_manager/raw/stream_handler.py b/cerebralcortex/core/data_manager/raw/stream_handler.py index 7b029f3..c92418d 100755 --- a/cerebralcortex/core/data_manager/raw/stream_handler.py +++ b/cerebralcortex/core/data_manager/raw/stream_handler.py @@ -107,7 +107,7 @@ def get_stream(self, stream_name:str, version:str, user_id:str=None, data_type=D ################################################################### ################## STORE DATA METHODS ############################# ################################################################### - def save_stream(self, datastream, ingestInfluxDB=False)->bool: + def save_stream(self, datastream, file_mode="append", ingestInfluxDB=False, publishOnKafka=False)->bool: """ Saves datastream raw data in selected NoSQL storage and metadata in MySQL. @@ -142,21 +142,19 @@ def save_stream(self, datastream, ingestInfluxDB=False)->bool: if 'user' not in column_names: raise Exception("user column is missing in data schema") - if 'ver' in column_names: - data = data.drop('ver') - + data = self._drop_column(data, column_names) result = self.sql_data.save_stream_metadata(metadata) + if result["status"]==True: version = result["version"] - if "version" in column_names: - data = data.drop('version') + if isinstance(data, pd.DataFrame): data["version"] = version else: data = data.withColumn('version', lit(version)) - status = self.nosql.write_file(stream_name, data) + status = self.nosql.write_file(stream_name, data, file_mode) return status else: print("Something went wrong in saving data points in SQL store.") @@ -168,6 +166,13 @@ def save_stream(self, datastream, ingestInfluxDB=False)->bool: else: raise Exception("Metadata cannot be empty.") + def _drop_column(selfd, data, column_names): + if 'version' in column_names: + if isinstance(data, pd.DataFrame): + del data["version"] + else: + data = data.drop('version') + return data def __update_data_desciptor(self, data, metadata): """ Read pyspark dataframe clumns and add each column name and type to datadescriptor field @@ -207,7 +212,7 @@ def __update_data_desciptor(self, data, metadata): new_dd.append(dd) if len(tmp)!=len(new_dd): - raise Exception("Data descriptor number of columns does not match with the actual number of dataframe columns. Add datadescription for each of dataframe column.") + raise Exception("Data descriptor number of columns does not match with the actual number of dataframe columns. Add data description for each of dataframe column.") updated_data_descriptors = [] for (datadescipt,column_names) in zip(new_dd, tmp): diff --git a/cerebralcortex/core/data_manager/sql/data.py b/cerebralcortex/core/data_manager/sql/data.py index d86b6f1..c6b6e91 100755 --- a/cerebralcortex/core/data_manager/sql/data.py +++ b/cerebralcortex/core/data_manager/sql/data.py @@ -116,7 +116,7 @@ def close(self, conn, cursor): except Exception as exp: raise Exception(exp) - def execute(self, sql, args=None, commit=False)->List[dict]: + def execute(self, sql, args=None, commit=False, executemany=False)->List[dict]: """ Execute a sql, it could be with args and with out args. The usage is similar with execute() function in module pymysql. @@ -125,6 +125,7 @@ def execute(self, sql, args=None, commit=False)->List[dict]: sql (str): sql clause args (tuple): args need by sql clause commit (bool): whether to commit + executemany (bool): execute batch Returns: list[dict]: returns a list of dicts if commit is set to False Raises: @@ -134,7 +135,10 @@ def execute(self, sql, args=None, commit=False)->List[dict]: conn = self.pool.get_connection() cursor = conn.cursor(dictionary=True) if args: - cursor.execute(sql, args) + if executemany: + cursor.executemany(sql, args) + else: + cursor.execute(sql, args) else: cursor.execute(sql) if commit is True: diff --git a/cerebralcortex/core/data_manager/sql/data_ingestion_handler.py b/cerebralcortex/core/data_manager/sql/data_ingestion_handler.py index ee3f0ff..54c79a5 100644 --- a/cerebralcortex/core/data_manager/sql/data_ingestion_handler.py +++ b/cerebralcortex/core/data_manager/sql/data_ingestion_handler.py @@ -24,6 +24,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import json +import re class DataIngestionHandler(): @@ -33,7 +34,7 @@ class DataIngestionHandler(): ################################################################### def add_ingestion_log(self, user_id: str = "", stream_name: str = "", file_path: str = "", fault_type: str = "", - fault_description: str = "", success: int = None) -> bool: + fault_description: str = "", success: int = None, metadata=None) -> bool: """ Log errors and success of each record during data import process. @@ -44,6 +45,7 @@ def add_ingestion_log(self, user_id: str = "", stream_name: str = "", file_path: fault_type (str): error type fault_description (str): error details success (int): 1 if data was successfully ingested, 0 otherwise + metadata (dict): (optional) metadata of a stream Returns: bool @@ -56,8 +58,12 @@ def add_ingestion_log(self, user_id: str = "", stream_name: str = "", file_path: if not user_id or not file_path or not fault_type or success is None: raise ValueError("user_id, file_path, fault_type, and success are mandatory parameters.") - qry = "INSERT IGNORE INTO " + self.ingestionLogsTable + " (user_id, stream_name, file_path, fault_type, fault_description, success) VALUES(%s, %s, %s, %s, %s, %s)" - vals = str(user_id), str(stream_name), str(file_path), str(fault_type), json.dumps(fault_description), success + if metadata: + qry = "INSERT IGNORE INTO " + self.ingestionLogsTable + " (user_id, stream_name, file_path, fault_type, fault_description, success, metadata) VALUES(%s, %s, %s, %s, %s, %s, %s)" + vals = str(user_id), str(stream_name), str(file_path), str(fault_type), json.dumps(fault_description), success, json.dumps(metadata) + else: + qry = "INSERT IGNORE INTO " + self.ingestionLogsTable + " (user_id, stream_name, file_path, fault_type, fault_description, success) VALUES(%s, %s, %s, %s, %s, %s)" + vals = str(user_id), str(stream_name), str(file_path), str(fault_type), json.dumps(fault_description), success try: self.execute(qry, vals, commit=True) @@ -65,9 +71,67 @@ def add_ingestion_log(self, user_id: str = "", stream_name: str = "", file_path: except Exception as e: raise Exception(e) + def update_ingestion_log(self, file_path: str = "", fault_type: str = "", fault_description: str = "", success: int = None) -> bool: + """ + update ingestion Logs of each record during data import process. + + Args: + file_path (str): filename with its path + fault_type (str): error type + fault_description (str): error details + success (int): 1 if data was successfully ingested, 0 otherwise + + Returns: + bool + + Raises: + ValeError: if + Exception: if sql query fails user_id, file_path, fault_type, or success parameters is missing + """ + + if not fault_type or success is None: + raise ValueError("fault_type and success are mandatory parameters.") + fault_description=re.sub('[^A-Za-z0-9:><]+', ' ', fault_description) + + qry = "UPDATE " + self.ingestionLogsTable + " SET fault_type=%s, fault_description=%s, success=%s where file_path=%s" + vals = str(fault_type), str(fault_description), success, str(file_path) + + try: + self.execute(qry, vals, commit=True) + return True + except Exception as e: + raise Exception(e) + + def add_scanned_files(self, user_id: str, stream_name: str, metadata:dict, files_list: list) -> bool: + """ + Add scanned files in ingestion log table that could be processed later on. This method is specific to MD2K data ingestion. + + Args: + user_id (str): id of a user + stream_name (str): name of a stream + metadata (dict): raw metadata + files_list (list): list of filenames with its path + + Returns: + bool + + Raises: + Exception: if sql query fails + """ + rows = [] + qry = "INSERT IGNORE INTO " + self.ingestionLogsTable + " (user_id, stream_name, metadata, file_path, fault_type, fault_description, success) VALUES(%s, %s, %s, %s, %s, %s, %s)" + for fp in files_list: + rows.append((str(user_id), str(stream_name), json.dumps(metadata).lower(), str(fp), "PENDING", "NOT-PROCESSED-YET", 5)) # success=1 process, success=0 error-in-processing, and success=5 no processed yet + + try: + self.execute(qry, rows, commit=True,executemany=True) + return True + except Exception as e: + raise Exception(e) + def get_processed_files_list(self, success_type=False) -> list: """ - Get a list of all the processed files + Get a list of all the processed/un-processed files Returns: list: list of all processed files list @@ -88,6 +152,59 @@ def get_processed_files_list(self, success_type=False) -> list: result.append(row["file_path"]) return result + def get_files_list(self, stream_name:str=None, user_id=None, success_type=None) -> list: + """ + Get a list of all the processed/un-processed files + + Returns: + list: list of all processed files list + """ + result = [] + if not stream_name and not user_id: + where_clause = " where success=%(success)s " + vals = {"success":success_type} + else: + where_clause = " where success=%s " + vals = (str(success_type),) + + if stream_name: + where_clause += " and stream_name=%s" + vals = vals + (stream_name,) + if user_id: + where_clause += " and user_id=%s" + vals = vals + (user_id,) + + if success_type==None: + qry = "select * from " + self.ingestionLogsTable + rows = self.execute(qry) + else: + qry = "select * from " + self.ingestionLogsTable + where_clause + rows = self.execute(qry, vals) + + if len(rows) == 0: + return result + else: + for row in rows: + result.append({"stream_name":row["stream_name"], "user_id": row["user_id"], "metadata":json.loads(row["metadata"]), "file_path":row["file_path"]}) + return result + + def is_file_processed(self, filename:str) -> bool: + """ + check if a file is processed and ingested + + Returns: + bool: True if file is already processed + """ + qry = "select file_path from " + self.ingestionLogsTable + " where file_path=%(file_path)s" + vals = {"file_path":filename} + rows = self.execute(qry, vals) + + if len(rows) > 0: + return True + else: + return False + + def get_ingestion_stats(self) -> list: """ Get stats on ingested records @@ -107,3 +224,19 @@ def get_ingestion_stats(self) -> list: for row in rows: result.append({"fault_type": row["fault_type"], "total_faults": row["total_faults"]}) return result + + def update_ingestion_log_status(self, stream_name, fault_type, fault_description, status_type, metadata={}, platform_metadata={}): + qry = "update " + self.ingestionLogsTable + " set metadata=%s, platform_metadata=%s where stream_name=%s" + vals = json.dumps(metadata), json.dumps(platform_metadata), str(stream_name) + try: + self.execute(qry, vals, commit=True) + except Exception as e: + print("Cannot update metadata in ingest_log table. ", str(e)) + + def update_ingestion_log_status_ignore(self, stream_name, fault_type, fault_description, status_type, metadata=None): + qry = "update " + self.ingestionLogsTable + " set fault_type=%s, fault_description=%s, success=%s where stream_name=%s" + vals = fault_type, fault_description, status_type, stream_name + try: + self.execute(qry, vals, commit=True) + except Exception as e: + print("Cannot update metadata in ingest_log table. ", str(e)) \ No newline at end of file diff --git a/cerebralcortex/core/data_manager/sql/metadata_handler.py b/cerebralcortex/core/data_manager/sql/metadata_handler.py index d33a525..0896495 100755 --- a/cerebralcortex/core/data_manager/sql/metadata_handler.py +++ b/cerebralcortex/core/data_manager/sql/metadata_handler.py @@ -27,7 +27,7 @@ class MetadataHandler: - def get_corrected_metadata(self, stream_name: str, status:str=["include"]) -> dict: + def get_corrected_metadata(self) -> dict: """ Retrieves corrected metadata @@ -40,19 +40,18 @@ def get_corrected_metadata(self, stream_name: str, status:str=["include"]) -> di Raises: ValueError: if stream_name is None/empty or error in executing query """ - if not stream_name: - raise ValueError("stream_name cannot be empty.") - - - qry = "select * from " + self.correctedMetadata + " where stream_name=%(stream_name)s group by stream_name" - vals = {"stream_name": str(stream_name)} + results = [] + + qry = "select * from " + self.correctedMetadata + " group by stream_name" try: - rows = self.execute(qry, vals) + rows = self.execute(qry) if len(rows) == 0: - return {} + return [] else: - metadata = rows[0]["metadata"].lower() - return {"metadata":json.loads(metadata), "status":rows[0]["status"]} + for row in rows: + metadata = row["metadata"].lower() + results.append({"stream_name":row["stream_name"],"metadata":json.loads(metadata), "status":row["status"]}) + return results except Exception as e: raise Exception(str(e)) diff --git a/cerebralcortex/core/data_manager/sql/stream_handler.py b/cerebralcortex/core/data_manager/sql/stream_handler.py index 0c27ad4..a2b746c 100755 --- a/cerebralcortex/core/data_manager/sql/stream_handler.py +++ b/cerebralcortex/core/data_manager/sql/stream_handler.py @@ -276,22 +276,27 @@ def save_stream_metadata(self, metadata_obj)->dict: """ isQueryReady = 0 - metadata_hash = metadata_obj.get_hash() - stream_name = metadata_obj.name + + if isinstance(metadata_obj, Metadata): + metadata_hash = metadata_obj.get_hash() + metadata_obj = metadata_obj.to_json() + + else: + metadata_hash = Metadata().get_hash_by_json(metadata_obj) + + stream_name = metadata_obj.get("name") is_metadata_changed = self._is_metadata_changed(stream_name, metadata_hash) status = is_metadata_changed.get("status") version = is_metadata_changed.get("version") - metadata_obj.set_version(version) - - metadata_str = metadata_obj.to_json() + #metadata_obj = metadata_obj if (status=="exist"): return {"status": True,"version":version, "record_type":"exist"} if (status == "new"): qry = "INSERT INTO " + self.datastreamTable + " (name, version, metadata_hash, metadata) VALUES(%s, %s, %s, %s)" - vals = str(stream_name), str(version), str(metadata_hash), json.dumps(metadata_str) + vals = str(stream_name), str(version), str(metadata_hash), json.dumps(metadata_obj) isQueryReady = 1 # if nothing is changed then isQueryReady would be 0 and no database transaction would be performed diff --git a/cerebralcortex/core/data_manager/sql/users_handler.py b/cerebralcortex/core/data_manager/sql/users_handler.py index 6a27144..9f97c7f 100755 --- a/cerebralcortex/core/data_manager/sql/users_handler.py +++ b/cerebralcortex/core/data_manager/sql/users_handler.py @@ -447,8 +447,8 @@ def username_checks(self, username:str): Raises: Exception: if username doesn't follow standards """ - regexp = re.compile(r'W') - if regexp.search(username) or len(username)>25: - raise Exception("Only alphanumeric usernames are allowed with the max length of 25 chars.") + regexp = re.compile('^\w+$') + if not regexp.search(username) or len(username)>50: + raise Exception("Only alphanumeric usernames are allowed with the max length of 50 chars.") else: return True diff --git a/cerebralcortex/core/datatypes/datastream.py b/cerebralcortex/core/datatypes/datastream.py index e5b0c61..2ba94a8 100755 --- a/cerebralcortex/core/datatypes/datastream.py +++ b/cerebralcortex/core/datatypes/datastream.py @@ -24,8 +24,16 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from pyspark.sql import functions as F +from pyspark.sql.functions import udf, collect_list from typing import List +from pyspark.sql.types import * +#from pyspark.sql.functions import pandas_udf,PandasUDFType +from pyspark.sql.window import Window +from cerebralcortex.core.plotting.basic_plots import BasicPlots +from cerebralcortex.core.plotting.stress_plots import StressStreamPlots + import re +import sys from cerebralcortex.core.metadata_manager.stream.metadata import Metadata @@ -46,6 +54,8 @@ def __init__(self, self._data = data self._metadata = metadata + self._basic_plots = BasicPlots() + self._stress_plots = StressStreamPlots() def get_metadata(self, version:int=None)->Metadata: """ @@ -116,16 +126,24 @@ def data(self, value): def to_pandas(self): """ This method converts pyspark dataframe into pandas dataframe. + Notes: This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible. + + Returns: + Datastream (Metadata, pandas.DataFrame): this will return a new datastream object with blank metadata + Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("STREAM-NAME") - >>> pandas_df = ds.to_pandas() - >>> pandas_df.head() + >>> new_ds = ds.to_pandas() + >>> new_ds.data.head() """ - return DataStream(data=self._data.toPandas(), metadata=Metadata()) + pdf = self._data.toPandas() + if "timestamp" in pdf.columns: + pdf = pdf.sort_values('timestamp') + return DataStream(data=pdf, metadata=Metadata()) def collect(self): @@ -140,12 +158,12 @@ def collect(self): # !!!! STAT METHODS !!! - def compute_average(self, windowDuration:int=60, colmnName:str=None)->object: + def compute_average(self, windowDuration:int=None, colmnName:str=None)->object: """ Window data and compute average of a windowed data of a single or all columns Args: - windowDuration (int): duration of a window in seconds + windowDuration (int): duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s) colmnName (str): average will be computed for all the columns if columnName param is not provided (for all windows) Returns: @@ -153,12 +171,12 @@ def compute_average(self, windowDuration:int=60, colmnName:str=None)->object: """ return self._compute_stats(windowDuration=windowDuration, methodName="avg", columnName=colmnName) - def compute_sqrt(self, windowDuration:int=60, colmnName:str=None)->object: + def compute_sqrt(self, windowDuration:int=None, colmnName:str=None)->object: """ Window data and compute square root of a windowed data of a single or all columns Args: - windowDuration (int): duration of a window in seconds + windowDuration (int): duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s) colmnName (str): square root will be computed for all the columns if columnName param is not provided (for all windows) Returns: @@ -166,12 +184,12 @@ def compute_sqrt(self, windowDuration:int=60, colmnName:str=None)->object: """ return self._compute_stats(windowDuration=windowDuration, methodName="sqrt", columnName=colmnName) - def compute_sum(self, windowDuration:int=60, colmnName:str=None)->object: + def compute_sum(self, windowDuration:int=None, colmnName:str=None)->object: """ Window data and compute sum of a windowed data of a single or all columns Args: - windowDuration (int): duration of a window in seconds + windowDuration (int): duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s) colmnName (str): average will be computed for all the columns if columnName param is not provided (for all windows) Returns: @@ -179,12 +197,12 @@ def compute_sum(self, windowDuration:int=60, colmnName:str=None)->object: """ return self._compute_stats(windowDuration=windowDuration, methodName="sum", columnName=colmnName) - def compute_variancee(self, windowDuration:int=60, colmnName:str=None)->object: + def compute_variance(self, windowDuration:int=None, colmnName:str=None)->object: """ Window data and compute variance of a windowed data of a single or all columns Args: - windowDuration (int): duration of a window in seconds + windowDuration (int): duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s) colmnName (str): variance will be computed for all the columns if columnName param is not provided (for all windows) Returns: @@ -192,12 +210,12 @@ def compute_variancee(self, windowDuration:int=60, colmnName:str=None)->object: """ return self._compute_stats(windowDuration=windowDuration, methodName="variance", columnName=colmnName) - def compute_stddev(self, windowDuration:int=60, colmnName:str=None)->object: + def compute_stddev(self, windowDuration:int=None, colmnName:str=None)->object: """ Window data and compute standard deviation of a windowed data of a single or all columns Args: - windowDuration (int): duration of a window in seconds + windowDuration (int): duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s) colmnName (str): standard deviation will be computed for all the columns if columnName param is not provided (for all windows) Returns: @@ -205,12 +223,12 @@ def compute_stddev(self, windowDuration:int=60, colmnName:str=None)->object: """ return self._compute_stats(windowDuration=windowDuration, methodName="stddev", columnName=colmnName) - def compute_min(self, windowDuration:int=60, colmnName:str=None)->object: + def compute_min(self, windowDuration:int=None, colmnName:str=None)->object: """ Window data and compute min of a windowed data of a single or all columns Args: - windowDuration (int): duration of a window in seconds + windowDuration (int): duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s) colmnName (str): min value will be computed for all the columns if columnName param is not provided (for all windows) Returns: @@ -218,12 +236,12 @@ def compute_min(self, windowDuration:int=60, colmnName:str=None)->object: """ return self._compute_stats(windowDuration=windowDuration, methodName="min", columnName=colmnName) - def compute_max(self, windowDuration:int=60, colmnName:str=None)->object: + def compute_max(self, windowDuration:int=None, colmnName:str=None)->object: """ Window data and compute max of a windowed data of a single or all columns Args: - windowDuration (int): duration of a window in seconds + windowDuration (int): duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s) colmnName (str): max will be computed for all the columns if columnName param is not provided (for all windows) Returns: @@ -232,22 +250,25 @@ def compute_max(self, windowDuration:int=60, colmnName:str=None)->object: return self._compute_stats(windowDuration=windowDuration, methodName="max", columnName=colmnName) - def _compute_stats(self, windowDuration:int=60, methodName:str=None, columnName:List[str]=[])->object: + def _compute_stats(self, windowDuration:int=None, methodName:str=None, columnName:List[str]=[])->object: """ Compute stats on pyspark dataframe Args: - windowDuration (int): duration of a window in seconds + windowDuration (int): duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s) methodName (str): pyspark stat method name columnName (str): max will be computed for all the columns if columnName param is not provided (for all windows) Returns: DataStream: this will return a new datastream object with blank metadata """ - - windowDuration = str(windowDuration)+" seconds" exprs = self._get_column_names(columnName=columnName, methodName=methodName) - result = self._data.groupBy(['user',F.window("timestamp", windowDuration)]).agg(exprs) + if windowDuration: + windowDuration = str(windowDuration)+" seconds" + win = F.window("timestamp", windowDuration) + result = self._data.groupBy(['user',win]).agg(exprs) + else: + result = self._data.groupBy(['user']).agg(exprs) result = self._update_column_names(result) return DataStream(data=result, metadata=Metadata()) @@ -255,7 +276,7 @@ def _compute_stats(self, windowDuration:int=60, methodName:str=None, columnName: # !!!! WINDOWING METHODS !!! - def window(self, windowDuration:int=60, groupByColumnName:List[str]=[], columnName:List[str]=[], slideDuration:int=None, startTime=None): + def window(self, windowDuration:int=60, groupByColumnName:List[str]=[], columnName:List[str]=[], slideDuration:int=None, startTime=None, preserve_ts=False): """ Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns. @@ -264,7 +285,8 @@ def window(self, windowDuration:int=60, groupByColumnName:List[str]=[], columnNa groupByColumnName List[str]: groupby column names, for example, groupby user, col1, col2 columnName List[str]: column names on which windowing should be performed. Windowing will be performed on all columns if none is provided slideDuration (int): slide duration of a window - startTime (datetime): start time of window. First time of data will be used as startTime if none is provided + startTime (datetime): The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes. First time of data will be used as startTime if none is provided + preserve_ts (bool): setting this to True will return timestamps of corresponding to each windowed value Returns: DataStream: this will return a new datastream object with blank metadata Note: @@ -272,13 +294,17 @@ def window(self, windowDuration:int=60, groupByColumnName:List[str]=[], columnNa """ windowDuration = str(windowDuration)+" seconds" - - - exprs = self._get_column_names(columnName=columnName, methodName="collect_list") + if slideDuration is not None: + slideDuration = str(slideDuration)+" seconds" + exprs = self._get_column_names(columnName=columnName, methodName="collect_list", preserve_ts=preserve_ts) + win = F.window("timestamp", windowDuration=windowDuration, slideDuration=slideDuration, startTime=startTime) if len(groupByColumnName)>0: - windowed_data = self._data.groupBy(['user','timestamp', F.window("timestamp", windowDuration=windowDuration, slideDuration=slideDuration, startTime=startTime)]).agg(exprs) + groupByColumnName.append("user") + groupByColumnName.append("version") + groupByColumnName.append(win) + windowed_data = self._data.groupBy(groupByColumnName).agg(exprs) else: - windowed_data = self._data.groupBy(['user',F.window("timestamp", windowDuration=windowDuration, slideDuration=slideDuration, startTime=startTime)]).agg(exprs) + windowed_data = self._data.groupBy(['user','version',win]).agg(exprs) data = windowed_data @@ -309,6 +335,12 @@ def drop_column(self, *args, **kwargs): data = self._data.drop(*args, **kwargs) return DataStream(data=data, metadata=Metadata()) + def summary(self): + """ + print the summary of the data + """ + self._data.describe().show(truncate=False) + def limit(self, *args, **kwargs): """ calls deafult dataframe limit @@ -347,6 +379,21 @@ def filter(self, columnName, operator, value): data = self._data.where(where_clause) return DataStream(data=data, metadata=Metadata()) + def map_stream(self, window_ds): + """ + Map/join a stream to a windowed stream + + Args: + window_ds (Datastream): windowed datastream object + + Returns: + Datastream: joined/mapped stream + + """ + window_ds = window_ds.data.drop("version", "user") + df= window_ds.join(self.data, self.data.timestamp.between(F.col("window.start"), F.col("window.end"))) + return DataStream(data=df, metadata=Metadata()) + def filter_user(self, user_ids:List): """ filter data to get only selective users' data @@ -379,7 +426,7 @@ def filter_version(self, version:List): data = self._data.where(self._data["version"].isin(version)) return DataStream(data=data, metadata=Metadata()) - def groupby(self, columnName): + def groupby(self, *columnName): """ Group data by column name Args: @@ -388,7 +435,7 @@ def groupby(self, columnName): Returns: """ - data = self._data.groupby(columnName) + data = self._data.groupby(*columnName) return DataStream(data=data, metadata=Metadata()) # def win(self, udfName): @@ -396,10 +443,106 @@ def groupby(self, columnName): # self.metadata = Metadata() # return self - def compute(self, udfName): - data = self._data.apply(udfName) + def compute(self, udfName, timeInterval=None): + if 'custom_window' in self._data.columns: + data = self._data.groupby('user','custom_window').apply(udfName) + else: + data = self._data.groupby('user').apply(udfName) + return DataStream(data=data, metadata=Metadata()) + + def run_algorithm(self, udfName, columnNames:List[str]=[], windowDuration:int=60, slideDuration:int=None, groupByColumnName:List[str]=[], startTime=None, preserve_ts=False): + """ + Run an algorithm + + Args: + udfName: Name of the algorithm + columnName List[str]: column names on which windowing should be performed. Windowing will be performed on all columns if none is provided + windowDuration (int): duration of a window in seconds + slideDuration (int): slide duration of a window + groupByColumnName List[str]: groupby column names, for example, groupby user, col1, col2 + startTime (datetime): The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes. First time of data will be used as startTime if none is provided + preserve_ts (bool): setting this to True will return timestamps of corresponding to each windowed value + Returns: + DataStream: this will return a new datastream object with blank metadata + + """ + windowDuration = str(windowDuration)+" seconds" + groupbycols = ["user", "version"] + + win = F.window("timestamp", windowDuration=windowDuration, slideDuration=slideDuration, startTime=startTime) + + if len(groupByColumnName)>0: + groupbycols.extend(groupByColumnName) + + groupbycols.append(win) + + if len(columnNames)==0: + raise ValueError("columnNames list cannot be empty.") + + tmp = "" + for col in columnNames: + tmp += "collect_list({}{}{}){}".format('"',col,'"',",") + + tmp = "{}{}{}{}".format("udfName", "(", tmp.rstrip(","), ")") + merged_column = self._data.groupBy(groupbycols).agg(eval(tmp).alias("merged_column")) + + cols = merged_column.schema.fields + new_cols = ["timestamp"] + for col in cols: + if col.name=="merged_column": + for cl in col.dataType.names: + new_cols.append("merged_column."+cl) + else: + new_cols.append(col.name) + + merged_column=merged_column.withColumn("timestamp", merged_column.window.start) + + data = merged_column.select(new_cols) + + return DataStream(data=data, metadata=Metadata()) + + def sort(self, columnNames:list=[], ascending=True): + """ + Sort data column in ASC or DESC order + + Returns: + object: DataStream object + """ + ascending_list = [] + if len(columnNames)==0: + columnNames.append("timestamp") + + for col in columnNames: + if ascending: + ascending_list.append(1) + else: + ascending_list.append(0) + data = self._data.orderBy(columnNames,ascending=ascending_list) return DataStream(data=data, metadata=Metadata()) + # def run_algo(self, udfName, windowSize:str="1 minute"): + # """ + # + # Args: + # udfName: Name of the algorithm + # windowSize: acceptable_params are "1 second", "1 minute", "1 hour" OR "1 day" + # """ + # acceptable_params = ["1 second", "1 minute", "1 hour", "1 day"] + # + # if windowSize=="1 second": + # extended_df = self._data.withColumn("groupby_col",F.concat(F.col("timestamp").cast("date"), F.lit("-"), F.hour(F.col("timestamp")), F.lit("-"), F.minute(F.col("timestamp")), F.lit("-"), F.second(F.col("timestamp"))).cast("string")) + # elif windowSize=="1 minute": + # extended_df = self._data.withColumn("groupby_col",F.concat(F.col("timestamp").cast("date"), F.lit("-"), F.hour(F.col("timestamp")), F.lit("-"), F.minute(F.col("timestamp"))).cast("string")) + # elif windowSize=="1 hour": + # extended_df = self._data.withColumn("groupby_col",F.concat(F.col("timestamp").cast("date"), F.lit("-"), F.hour(F.col("timestamp"))).cast("string")) + # elif windowSize=="1 day": + # extended_df = self._data.withColumn("groupby_col",F.concat(F.col("timestamp").cast("date")).cast("string")) + # else: + # raise ValueError(str(windowSize)+" is not an acceptable param. Acceptable params are only: "+" OR ".join(acceptable_params)) + # + # data = extended_df.groupBy("user","version","groupby_col").apply(udfName) + # return DataStream(data=data, metadata=Metadata()) + def show(self, *args, **kwargs): self._data.show(*args, **kwargs) @@ -412,7 +555,7 @@ def schema(self): """ return self._data.schema - def _get_column_names(self, columnName:List[str], methodName:str): + def _get_column_names(self, columnName:List[str], methodName:str, preserve_ts:bool=False): """ Get data column names and build expression for pyspark aggregate method @@ -425,11 +568,15 @@ def _get_column_names(self, columnName:List[str], methodName:str): dict: {columnName: methodName} """ columns = self._data.columns + black_list_column = ["timestamp", "localtime", "user", "version"] - if "localtime" in columns: - black_list_column = ["timestamp", "localtime", "user", "version"] - else: - black_list_column = ["timestamp", "user", "version"] + if "localtime" not in columns: + black_list_column.pop(1) + elif preserve_ts: + black_list_column.pop(1) + + if preserve_ts: + black_list_column.pop(0) if columnName: if isinstance(columns, str): @@ -441,3 +588,109 @@ def _get_column_names(self, columnName:List[str], methodName:str): exprs = {x: methodName for x in columns} return exprs + + +############################### PLOTS ############################### + def _sort_values(self, pdf): + if "timestamp" in pdf.columns: + return pdf.sort_values('timestamp') + return pdf + + def plot(self, y_axis_column=None): + pdf = self._data.toPandas() + pdf = self._sort_values(pdf) + self._basic_plots.timeseries(pdf, y_axis_column=y_axis_column) + + def plot_hist(self, x_axis_column=None): + pdf = self._data.toPandas() + pdf = self._sort_values(pdf) + self._basic_plots.hist(pdf, x_axis_column=x_axis_column) + + def plot_gps_cords(self, zoom=5): + pdf = self._data.toPandas() + pdf = self._sort_values(pdf) + return self._basic_plots.plot_gps_cords(pdf, zoom=zoom) + + def plot_stress_pie(self, x_axis_column="stresser_main"): + pdf = self._data.toPandas() + pdf = self._sort_values(pdf) + self._stress_plots.plot_pie(pdf, x_axis_column) + + def plot_stress_gantt(self): + pdf = self._data.toPandas() + pdf = self._sort_values(pdf) + self._stress_plots.plot_gantt(pdf) + + def plot_stress_sankey(self, cat_cols=["stresser_main","stresser_sub"], value_cols='density',title="Stressers' Sankey Diagram"): + pdf = self._data.toPandas() + pdf = self._sort_values(pdf) + self._stress_plots.plot_sankey(df=pdf,cat_cols=cat_cols, value_cols=value_cols, title=title) + + def plot_stress_bar(self, x_axis_column="stresser_main"): + pdf = self._data.toPandas() + pdf = self._sort_values(pdf) + self._stress_plots.plot_bar(pdf, x_axis_column=x_axis_column) + + def plot_stress_comparison(self, x_axis_column="stresser_main", usr_id=None, compare_with="all"): + pdf = self._data.toPandas() + pdf = self._sort_values(pdf) + self._stress_plots.plot_comparison(pdf, x_axis_column=x_axis_column, usr_id=usr_id, compare_with=compare_with) + + + +###################### New Methods by Anand ######################### + + + def join(self, dataStream, propagation='forward'): + """ + filter data + + Args: + columnName (str): name of the column + operator (str): basic operators (e.g., >, <, ==, !=) + value (Any): if the columnName is timestamp, please provide python datatime object + + Returns: + DataStream: this will return a new datastream object with blank metadata + """ + combined_df = self._data.join(dataStream.data, on = ['user','timestamp', 'localtime','version'], how='full').orderBy('timestamp') + combined_filled = combined_df.withColumn("data_quality", F.last('data_quality', True).over(Window.partitionBy('user').orderBy('timestamp').rowsBetween(-sys.maxsize, 0))) + combined_filled_filtered = combined_filled.filter(combined_filled.ecg.isNotNull()) + + return DataStream(data=combined_filled_filtered, metadata=Metadata()) + + + + def create_windows(self, window_length='hour'): + """ + filter data + + Args: + columnName (str): name of the column + operator (str): basic operators (e.g., >, <, ==, !=) + value (Any): if the columnName is timestamp, please provide python datatime object + + Returns: + DataStream: this will return a new datastream object with blank metadata + """ + windowed_df = self._data.withColumn('custom_window', windowing_udf('timestamp')) + return DataStream(data=windowed_df, metadata=Metadata()) + +from pyspark.sql.functions import hour, mean +""" +Windowing function to customize the parallelization of computation. +""" +def get_window(x): + u = '_' + y = x.year + m = x.month + d = x.day + h = x.hour + mi = x.minute + s = str(y) + u + str(m) + u + str(d) + u + str(h) #+ u + str(mi) + + return s + +windowing_udf = udf(get_window, StringType()) + + diff --git a/cerebralcortex/core/log_manager/log_handler.py b/cerebralcortex/core/log_manager/log_handler.py index 23309ff..1ebcb6b 100755 --- a/cerebralcortex/core/log_manager/log_handler.py +++ b/cerebralcortex/core/log_manager/log_handler.py @@ -24,67 +24,67 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import logging import inspect -import syslog -from datetime import datetime +import os class LogTypes(): EXCEPTION = 1, CRITICAL = 2, ERROR = 3, - WARNING = 4, + WARNING=4, MISSING_DATA = 5, DEBUG = 6 class LogHandler(): - def logsyslog(self, loglevel: str, message: str): - """ - Initialize logging - Args: - loglevel (str): log level (e.g., warning, critical etc.) - message (str): log message - """ - syslog.openlog(ident="CerebralCortex") - syslog.syslog(loglevel, message) - syslog.closelog() + def log(self, error_message="", error_type=LogTypes.EXCEPTION): - def log(self, error_message: str = "", error_type=LogTypes.EXCEPTION): - """ - Log errors and warnings in log file and print on console of debug is set to True + if not os.path.exists(self.log_path): + os.makedirs(self.log_path) + + FORMAT = '[%(asctime)s] - %(message)s' - Args: - error_message (str): error message - error_type (LogTypes): error type (e.g., warning, critical etc.) - """ execution_stats = inspect.stack() method_name = execution_stats[1][3] file_name = execution_stats[1][1] line_number = execution_stats[1][2] - error_message = str(datetime.now()) + " - [" + str(file_name) + " - " + str(method_name) + " - " + str( - line_number) + "] - " + str(error_message) + error_message = "[" + str(file_name) + " - " + str(method_name) + " - " + str(line_number) + "] - " + str(error_message) - if error_type == LogTypes.CRITICAL: - self.logsyslog(syslog.LOG_CRIT, error_message) + if error_type==LogTypes.CRITICAL: + logs_filename = self.log_path+"critical.log" + logging.basicConfig(filename=logs_filename,level=logging.CRITICAL, format=FORMAT) + logging.critical(error_message) elif error_type == LogTypes.ERROR: - self.logsyslog(syslog.LOG_ERR, error_message) + logs_filename = self.log_path+"error.log" + logging.basicConfig(filename=logs_filename,level=logging.ERROR, format=FORMAT) + logging.error(error_message) elif error_type == LogTypes.EXCEPTION: - self.logsyslog(syslog.LOG_ERR, error_message) + logs_filename = self.log_path+"error.log" + logging.basicConfig(filename=logs_filename,level=logging.ERROR, format=FORMAT) + logging.exception(error_message) elif error_type == LogTypes.WARNING: - self.logsyslog(syslog.LOG_WARNING, error_message) + logs_filename = self.log_path+"warning.log" + logging.basicConfig(filename=logs_filename,level=logging.WARNING, format=FORMAT) + logging.warning(error_message) elif error_type == LogTypes.DEBUG: - self.logsyslog(syslog.LOG_DEBUG, error_message) + logs_filename = self.log_path+"debug.log" + logging.basicConfig(filename=logs_filename,level=logging.DEBUG, format=FORMAT) + logging.debug(error_message) elif error_type == LogTypes.MISSING_DATA: - error_message = 'MISSING_DATA ' + error_message - self.logsyslog(syslog.LOG_ERR, error_message) + logs_filename = self.log_path+"missing_data.log" + logging.basicConfig(filename=logs_filename,level=logging.WARNING, format=FORMAT) + logging.warning(error_message) else: - self.logsyslog(syslog.LOG_INFO, error_message) + logs_filename = self.log_path+"info.log" + logging.basicConfig(filename=logs_filename,level=logging.INFO, format=FORMAT) + logging.info(error_message) if self.debug: print(error_message) if self.throw_exception: - raise Exception(error_message) + raise Exception(error_message) \ No newline at end of file diff --git a/cerebralcortex/core/log_manager/logging.py b/cerebralcortex/core/log_manager/logging.py index 90b15a5..5a2b92e 100755 --- a/cerebralcortex/core/log_manager/logging.py +++ b/cerebralcortex/core/log_manager/logging.py @@ -38,3 +38,4 @@ def __init__(self, CC): """ self.debug = CC.debug self.throw_exception = CC.config["cc"]["throw_exception"] + self.log_path = CC.config["cc"]["log_files_path"] diff --git a/cerebralcortex/core/metadata_manager/stream/data_descriptor.py b/cerebralcortex/core/metadata_manager/stream/data_descriptor.py index ca888aa..5ff9f79 100755 --- a/cerebralcortex/core/metadata_manager/stream/data_descriptor.py +++ b/cerebralcortex/core/metadata_manager/stream/data_descriptor.py @@ -31,7 +31,7 @@ def __init__(self): """ self.name = None self.type = None - self.attributes = [] + self.attributes = {} def set_attribute(self, key, value): """ @@ -49,7 +49,7 @@ def set_attribute(self, key, value): """ if key is None or key=="" or value is None or value=="": raise ValueError("Key and/or value cannot be None or empty.") - self.attributes.append({key:value}) + self.attributes[key]=value return self def set_name(self, value): diff --git a/cerebralcortex/core/metadata_manager/stream/metadata.py b/cerebralcortex/core/metadata_manager/stream/metadata.py index 6a86526..8f5555f 100755 --- a/cerebralcortex/core/metadata_manager/stream/metadata.py +++ b/cerebralcortex/core/metadata_manager/stream/metadata.py @@ -38,7 +38,6 @@ def __init__(self): """ self.name = None - self.version = None self.description = "" self.metadata_hash = None self.input_streams = [] @@ -163,20 +162,6 @@ def set_name(self, value:str): self.name = value return self - def set_version(self, value:int): - """ - set version of a stream - - Args: - value (int): version of a stream - - Returns: - self - - """ - self.version = value - return self - def set_description(self, stream_description:str): """ Add stream description @@ -254,15 +239,16 @@ def is_valid(self)->bool: ValueError: if metadata fields are not set """ + if not self.name: raise ValueError("Stream name is not defined.") if not self.description: raise ValueError("Stream description is not defined.") if len(self.data_descriptor)==0: raise Exception("Data descriptor length cannot be 0.") - for dd_obj in self.data_descriptor: - if (dd_obj.attributes is None or len(dd_obj.attributes)==0): - raise ValueError("Add brief description for each column in data desciptor. For example, DataDescriptor().set_attribute('description'', 'sleep time''))") + # for dd_obj in self.data_descriptor: + # if (dd_obj.attributes is None or len(dd_obj.attributes)==0): + # raise ValueError("Add brief description for each column in data desciptor. For example, DataDescriptor().set_attribute('description'', 'sleep time''))") for mm_obj in self.modules: if (mm_obj.name is None or mm_obj.name==""): raise ValueError("Module name and/or version fields are missing in module info.") @@ -299,20 +285,44 @@ def get_hash(self)->str: Returns: str: hash id of metadata + """ name = self.name - version = self.version data_descriptor = "" modules = "" for dd in self.data_descriptor: data_descriptor += str(dd.name)+str(dd.type) for mm in self.modules: modules += str(mm.name) + str(mm.version) + str(mm.authors) - hash_string = str(name)+str(version)+str(data_descriptor)+str(modules) + hash_string = str(name)+"None"+str(data_descriptor)+str(modules) + hash_string = hash_string.strip().lower().replace(" ", "") + + return str(uuid.uuid3(uuid.NAMESPACE_DNS, hash_string)) + + def get_hash_by_json(self, metadata:dict=None)->str: + """ + Get the unique hash of metadata. Hash is generated based on "stream-name + data_descriptor + module-metadata" + + Args: + metadata: only pass this if this method is used on a dict object outside of Metadata class + Returns: + str: hash id of metadata + + """ + + name = metadata.get("name") + data_descriptor = "" + modules = "" + for dd in metadata.get("data_descriptor"): + data_descriptor += str(dd.get("name"))+str(dd.get("type")) + for mm in metadata.get("modules"): + modules += str(mm.get("name")) + str(mm.get("version")) + str(mm.get("authors")) + hash_string = str(name)+"None"+str(data_descriptor)+str(modules) hash_string = hash_string.strip().lower().replace(" ", "") return str(uuid.uuid3(uuid.NAMESPACE_DNS, hash_string)) + def from_json_sql(self, metadata_json: dict)->List: """ Convert dict (json) objects into Metadata class objects @@ -401,4 +411,10 @@ def from_json_file(self, metadata: dict)->List: return md def __repr__(self): - return str(self.__dict__) \ No newline at end of file + data = self.to_json() + return json.dumps(data, indent=4, sort_keys=True) + #return str(self.__dict__) + + ###################################### Overridden Python methods ########################################## + # def __str__(self): + # pass \ No newline at end of file diff --git a/cerebralcortex/core/metadata_manager/stream/module_info.py b/cerebralcortex/core/metadata_manager/stream/module_info.py index 893443a..9ec40b0 100755 --- a/cerebralcortex/core/metadata_manager/stream/module_info.py +++ b/cerebralcortex/core/metadata_manager/stream/module_info.py @@ -33,7 +33,7 @@ def __init__(self): self.name = None self.version = None self.authors = [] - self.attributes = [] + self.attributes = {} def set_name(self, value): """ @@ -98,7 +98,7 @@ def set_attribute(self, key:str, value:str): """ if key is None or key=="" or value is None or value=="": raise ValueError("Key and/or value cannot be None or empty.") - self.attributes.append({key:value}) + self.attributes[key] =value return self def from_json(self, obj): diff --git a/cerebralcortex/core/plotting/__init__.py b/cerebralcortex/core/plotting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cerebralcortex/core/plotting/basic_plots.py b/cerebralcortex/core/plotting/basic_plots.py new file mode 100644 index 0000000..1f30312 --- /dev/null +++ b/cerebralcortex/core/plotting/basic_plots.py @@ -0,0 +1,84 @@ +# Copyright (c) 2019, MD2K Center of Excellence +# - Nasir Ali +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import plotly.plotly as py +import plotly.graph_objs as go +import pandas as pd +import random +from datetime import datetime, timedelta +import cufflinks as cf +from plotly.offline import iplot, init_notebook_mode +import pandas as pd +from datetime import datetime +from ipyleaflet import Map, Marker, MarkerCluster + + +class BasicPlots(): + def remove_cols(self, pdf, cols=["user", "version", "timestamp", "localtimestamp", "localtime", "window"]): + for col in cols: + if col in pdf.columns: + del pdf[col] + return pdf + + def timeseries(self, pdf, y_axis_column=None): + cf.set_config_file(offline=True, world_readable=True, theme='ggplot') + init_notebook_mode(connected=True) + ts = pdf['timestamp'] + pdf = self.remove_cols(pdf) + if y_axis_column: + data = [go.Scatter(x=ts, y=pdf[str(y_axis_column)])] + iplot(data, filename = 'time-series-plot') + else: + iplot([{ + 'x': ts, + 'y': pdf[col], + 'name': col + } for col in pdf.columns], filename='time-series-plot') + + def hist(self, pdf, x_axis_column=None): + cf.set_config_file(offline=True, world_readable=True, theme='ggplot') + init_notebook_mode(connected=True) + pdf = self.remove_cols(pdf) + if x_axis_column: + data = [go.Histogram(x=pdf[str(x_axis_column)])] + iplot(data, filename='basic histogram') + else: + pdf.iplot(kind='histogram', filename='basic histogram') + + def plot_gps_cords(self, pdf, zoom=5): + marker_list = [] + center = None + for index, row in pdf.iterrows(): + if center is None: + center = [row["latitude"], row["longitude"]] + marker_list.append(Marker(location=(row["latitude"], row["longitude"]))) + + m = Map(center=(center), zoom=zoom) + marker_cluster = MarkerCluster( + markers=(marker_list) + ) + m.add_layer(marker_cluster); + return m + diff --git a/cerebralcortex/core/plotting/stress_plots.py b/cerebralcortex/core/plotting/stress_plots.py new file mode 100644 index 0000000..837c4cd --- /dev/null +++ b/cerebralcortex/core/plotting/stress_plots.py @@ -0,0 +1,178 @@ +# Copyright (c) 2019, MD2K Center of Excellence +# - Nasir Ali +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import plotly.plotly as py +import plotly.graph_objs as go +import pandas as pd +import random +from datetime import datetime, timedelta +import cufflinks as cf +from plotly.offline import iplot, init_notebook_mode +import pandas as pd +from datetime import datetime +import plotly.figure_factory as ff + + +class StressStreamPlots(): + def plot_pie(self, pdf, group_by_column=None): + pdf=pdf.groupby(str(group_by_column), as_index=False).agg('count') + labels=[] + values=[] + for index, row in pdf.iterrows(): + labels.append(row["stresser_main"]) + values.append(row["density"]) + + trace = go.Pie(labels=labels, values=values) + iplot([trace], filename='stresser_pie_chart') + + def plot_gantt(self, pdf): + data=[] + for index, row in pdf.iterrows(): + data.append(dict(Task=row["stresser_sub"], Start=row["start_time"], Finish=row["end_time"], Resource=row["stresser_main"])) + + fig = ff.create_gantt(data, index_col='Resource', title='Stressers, Main & Sub Categories', + show_colorbar=True, bar_width=0.8, showgrid_x=True, showgrid_y=True) + fig['layout']['yaxis'].update({"showticklabels":False}) + iplot(fig, filename='gantt-hours-minutes') + + def plot_sankey(self, df,cat_cols=[], value_cols='',title="Stressors' Sankey Diagram"): + labelList = [] + + for catCol in cat_cols: + labelListTemp = list(set(df[catCol].values)) + labelList = labelList + labelListTemp + + # remove duplicates from labelList + labelList = list(dict.fromkeys(labelList)) + + # transform df into a source-target pair + for i in range(len(cat_cols)-1): + if i==0: + sourceTargetDf = df[[cat_cols[i],cat_cols[i+1],value_cols]] + sourceTargetDf.columns = ['source','target','density'] + else: + tempDf = df[[cat_cols[i],cat_cols[i+1],value_cols]] + tempDf.columns = ['source','target','density'] + sourceTargetDf = pd.concat([sourceTargetDf,tempDf]) + sourceTargetDf = sourceTargetDf.groupby(['source','target']).agg({'density':'mean'}).reset_index() + + # add index for source-target pair + sourceTargetDf['sourceID'] = sourceTargetDf['source'].apply(lambda x: labelList.index(x)) + sourceTargetDf['targetID'] = sourceTargetDf['target'].apply(lambda x: labelList.index(x)) + + # creating the sankey diagram + data = dict( + type='sankey', + node = dict( + pad = 15, + thickness = 20, + line = dict( + color = "black", + width = 0.5 + ), + label = labelList + ), + link = dict( + source = sourceTargetDf['sourceID'], + target = sourceTargetDf['targetID'], + value = sourceTargetDf['density'] + ) + ) + + layout = dict( + title = title, + font = dict( + size = 10 + ) + ) + + fig = dict(data=[data], layout=layout) + iplot(fig, validate=False) + + def plot_bar(self, pdf, x_axis_column=None): + grouped_pdf=pdf.groupby(["user",x_axis_column], as_index=False).agg('mean') + user_ids = pdf.groupby("user", as_index=False).last() + + data = [] + + for index, row in user_ids.iterrows(): + sub=grouped_pdf.loc[grouped_pdf['user'] == row["user"]] + sub.sort_values(x_axis_column) + + data.append(go.Bar({ + 'y': sub["density"], + 'x': sub[x_axis_column], + 'name': row["user"] + })) + + layout = go.Layout( + title="All Participants' Stress Levels By Each Stressors", + yaxis=dict( + title='Average Stress Density' + ) + ) + fig = go.Figure(data=data, layout=layout) + iplot(fig, filename='basic-line') + + def plot_comparison(self, pdf, x_axis_column=None, usr_id=None, compare_with="all"): + data = [] + if usr_id: + usr_data = pdf.loc[pdf['user'] == str(usr_id)] + + if compare_with =="all" or compare_with is None: + compare_with_data = pdf.loc[pdf['user'] != str(usr_id)] + else: + compare_with_data = pdf.loc[pdf['user'] == str(compare_with)] + + grouped_user_pdf=usr_data.groupby([x_axis_column], as_index=False).agg('mean') + grouped_compare_with_pdf=compare_with_data.groupby([x_axis_column], as_index=False).agg('mean') + + data.append(go.Bar({ + 'y': grouped_user_pdf["density"], + 'x': grouped_user_pdf[x_axis_column], + 'name': usr_id + })) + if compare_with=="all": + compare_with = "All Participants" + data.append(go.Bar({ + 'y': grouped_compare_with_pdf["density"], + 'x': grouped_compare_with_pdf[x_axis_column], + 'name': compare_with + })) + + layout = go.Layout( + title="Comparison of Stress Levels Amongst Participants", + yaxis=dict( + title='Average Stress Density' + ) + ) + fig = go.Figure(data=data, layout=layout) + iplot(fig, filename='basic-line') + + #iplot(data, filename='basic-line') + + else: + raise Exception("usr_id cannot be None/Blank.") + diff --git a/cerebralcortex/data_importer/data_parsers/util.py b/cerebralcortex/data_importer/data_parsers/util.py index 25b3bbd..c3d38a1 100644 --- a/cerebralcortex/data_importer/data_parsers/util.py +++ b/cerebralcortex/data_importer/data_parsers/util.py @@ -27,7 +27,7 @@ from cerebralcortex.data_importer.util.helper_methods import rename_column_name -def assign_column_names_types(df: pd, metadata: dict = None) -> pd: +def assign_column_names_types_strict(df: pd, metadata: dict = None) -> pd: """ Change column names to the names defined in metadata->data_descriptor block @@ -75,7 +75,7 @@ def assign_column_names_types(df: pd, metadata: dict = None) -> pd: return df -def assign_column_names_types_strict(df: pd, metadata: dict = None) -> pd: +def assign_column_names_types(df: pd, metadata: dict = None) -> pd: """ Change column names to the names defined in metadata->data_descriptor block @@ -90,18 +90,18 @@ def assign_column_names_types_strict(df: pd, metadata: dict = None) -> pd: new_column_names = {0: "timestamp", 1: "localtime"} if metadata is not None: - data_desciptor = metadata.get("data_descriptor", []) + data_desciptor = metadata.data_descriptor if isinstance(data_desciptor, dict): data_desciptor = [data_desciptor] for dd in data_desciptor: - name = rename_column_name(dd.get("name", "", )) - metadata_columns.append({"name": name, "type": dd.get("data_type", "")}) + name = rename_column_name(dd.name) + metadata_columns.append({"name": name, "type": dd.type}) if len(metadata_columns) > 0: col_no = 2 # first two column numbers are timestamp and offset for mc in metadata_columns: - new_column_names[col_no] = mc["name"] + new_column_names[col_no] = mc.get("name") col_no += 1 else: for column in df: diff --git a/cerebralcortex/data_importer/ingest.py b/cerebralcortex/data_importer/ingest.py index c8118cd..262fea8 100644 --- a/cerebralcortex/data_importer/ingest.py +++ b/cerebralcortex/data_importer/ingest.py @@ -46,7 +46,7 @@ warnings.simplefilter(action='ignore', category=FutureWarning) -def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamname_pattern: str = None, compression: str = None, header: int = None, +def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamname_pattern: str = None, ignore_streamname_pattern:str=None, compression: str = None, header: int = None, metadata: Metadata = None, metadata_parser: Callable = None, data_parser: Callable = None): """ Import a single file and its metadata into cc-storage. @@ -54,7 +54,9 @@ def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamnam Args: cc_config (str): cerebralcortex config directory user_id (str): user id. Currently import_dir only supports parsing directory associated with a user - file_path (str): file path. + file_path (str): file path + allowed_streamname_pattern (str): (optional) regex of stream-names to be processed only + ignore_streamname_pattern (str): (optional) regex of stream-names to be ignored during ingestion process compression (str): pass compression name if csv files are compressed header (str): (optional) row number that must be used to name columns. None means file does not contain any header metadata (Metadata): (optional) Same metadata will be used for all the data files if this parameter is passed. If metadata is passed then metadata_parser cannot be passed. @@ -116,13 +118,14 @@ def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamnam metadata_dict = metadata.to_json() except Exception as e: fault_description = "metadata object is not valid: " + str(e) - sql_data.add_ingestion_log(user_id=user_id, stream_name=metadata_dict.get("name", "no-name"), - file_path=file_path, fault_type="CANNOT_PARSE_METADATA_FILE", - fault_description=fault_description, success=0) + sql_data.add_ingestion_log(user_id=user_id, stream_name=metadata_dict.get("name", "no-name"), + file_path=file_path, fault_type="CANNOT_PARSE_METADATA_FILE", + fault_description=fault_description, success=0) else: raise Exception("Invalid metadata") else: + metadata_dict = {} try: file_ext = os.path.splitext(file_path)[1] @@ -132,15 +135,15 @@ def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamnam metadata = md.read() metadata = metadata.lower() metadata_dict = json.loads(metadata) - cmm = sql_data.get_corrected_metadata(stream_name=metadata_dict.get("name")) - if cmm.get("status")!="include": - fault_description = "Ignored stream ingestion: "+str(metadata_dict.get("name"))+". Criteria: "+cmm.get("status") - sql_data.add_ingestion_log(user_id=user_id, stream_name=metadata_dict.get("name", "no-name"), - file_path=file_path, fault_type="IGNORED_STREAM", - fault_description=fault_description, success=0) - return False - if cmm.get("metadata"): - metadata_dict = cmm.get("metadata") + # cmm = sql_data.get_corrected_metadata(stream_name=metadata_dict.get("name")) + # if cmm.get("status","")!="include" and metadata_parser is not None and 'mcerebrum' in metadata_parser.__name__: + # fault_description = "Ignored stream: "+str(metadata_dict.get("name"))+". Criteria: "+cmm.get("status", "") + # sql_data.add_ingestion_log(user_id=user_id, stream_name=metadata_dict.get("name", "no-name"), + # file_path=file_path, fault_type="IGNORED_STREAM", + # fault_description=fault_description, success=0) + # return False + # if cmm.get("metadata"): + # metadata_dict = cmm.get("metadata") #metadata = Metadata().from_json_file(metadata_dict) except Exception as e: fault_description = "read/parse metadata: " + str(e) @@ -171,15 +174,27 @@ def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamnam file_path=file_path, fault_type="MISSING_METADATA_FIELDS", fault_description=fault_description, success=0) return False - if metadata_parser is not None and metadata_parser.__name__ == 'mcerebrum_metadata_parser': + if metadata_parser is not None and 'mcerebrum' in metadata_parser.__name__: stream_metadata = metadata["stream_metadata"] else: stream_metadata = metadata + if ignore_streamname_pattern is not None: + try: + ignore_streamname_pattern = re.compile(ignore_streamname_pattern) + ignore_streamname = ignore_streamname_pattern.search(stream_metadata.name) + if ignore_streamname: + return False + except: + raise Exception("ignore_streamname_pattern regular expression is not valid.") + if allowed_streamname_pattern is not None: try: allowed_streamname_pattern = re.compile(allowed_streamname_pattern) is_blacklisted = allowed_streamname_pattern.search(stream_metadata.name) + if is_blacklisted is None: + return False + except: raise Exception("allowed_streamname_pattern regular expression is not valid.") else: @@ -201,7 +216,7 @@ def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamnam "" + str(e) sql_data.add_ingestion_log(user_id=user_id, stream_name=stream_metadata.name, file_path=file_path, fault_type="PARTIAL_CORRUPT_DATA_FILE", fault_description=fault_description, success=0) - if compression is not None: + elif compression is not None: df = pd.read_csv(file_path, compression=compression, delimiter="\n", header=header, quotechar='"') else: df = pd.read_csv(file_path, delimiter="\n", header=header, quotechar='"') @@ -228,11 +243,10 @@ def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamnam fault_type="CANNOT_PARSE_DATA_FILE", fault_description=fault_description, success=0) return False - - df = assign_column_names_types(df, metadata_dict) + df = assign_column_names_types(df, Metadata().from_json_file(metadata_dict)) # save metadata/data - if metadata_parser is not None and metadata_parser.__name__ == 'mcerebrum_metadata_parser': + if metadata_parser is not None and 'mcerebrum' in metadata_parser.__name__: platform_data = metadata_dict.get("execution_context", {}).get("platform_metadata", "") if platform_data: @@ -242,7 +256,22 @@ def import_file(cc_config: dict, user_id: str, file_path: str, allowed_streamnam save_data(df=platform_df, cc_config=cc_config, user_id=user_id, stream_name=metadata["platform_metadata"].name) try: - df = df.dropna() + df = df.dropna() # TODO: Handle NaN cases and don't drop it + total_metadata_dd_columns = len(metadata["stream_metadata"].data_descriptor) + + # first two columns are timestamp and localtime in mcerbrum data. For all other data, first column "should be" timestamp + if 'mcerebrum' in data_parser.__name__: + total_df_columns = len(df.columns.tolist())-2 + else: + total_df_columns = len(df.columns.tolist())-1 + + if total_metadata_dd_columns!=total_df_columns: + fault_description = "Metadata and Data column missmatch. Total Metadata columns " + str(total_metadata_dd_columns) +". Total data columsn: "+str(total_df_columns) + sql_data.add_ingestion_log(user_id=user_id, stream_name=metadata_dict.get("name", "no-name"), + file_path=file_path, fault_type="NUMBER_OF_COLUMN_MISSMATCH", + fault_description=fault_description, success=0) + return False + save_data(df=df, cc_config=cc_config, user_id=user_id, stream_name=metadata["stream_metadata"].name) sql_data.save_stream_metadata(metadata["stream_metadata"]) sql_data.add_ingestion_log(user_id=user_id, stream_name=metadata_dict.get("name", "no-name"), @@ -277,18 +306,20 @@ def save_data(df: object, cc_config: dict, user_id: str, stream_name: str): """ df["version"] = 1 df["user"] = str(user_id) - table = pa.Table.from_pandas(df) + table = pa.Table.from_pandas(df,nthreads=1) partition_by = ["version", "user"] if cc_config["nosql_storage"] == "filesystem": data_file_url = os.path.join(cc_config["filesystem"]["filesystem_path"], "stream="+str(stream_name)) - # data_file_url = os.path.join("/home/ali/IdeaProjects/MD2K_DATA/tmp/", "stream=" + str(stream_name), "version=1", - # "user=" + str(user_id)) pq.write_to_dataset(table, root_path=data_file_url, partition_cols=partition_by, preserve_index=False) + elif cc_config["nosql_storage"] == "hdfs": - raw_files_dir = cc_config['hdfs']['raw_files_dir'] + data_file_url = os.path.join(cc_config["hdfs"]["raw_files_dir"], "stream="+str(stream_name)) fs = pa.hdfs.connect(cc_config['hdfs']['host'], cc_config['hdfs']['port']) - with fs.open(raw_files_dir, "wb") as fw: - pq.write_table(table, fw, partition_cols=partition_by, preserve_index=False) + pq.write_to_dataset(table, root_path=data_file_url, filesystem=fs, partition_cols=partition_by, preserve_index=False) + + else: + raise Exception(str(cc_config["nosql_storage"])+" is not supported yet. Please check your cerebralcortex configs (nosql_storage).") + def print_stats_table(ingestion_stats: dict): @@ -313,8 +344,8 @@ def print_stats_table(ingestion_stats: dict): def import_dir(cc_config: dict, input_data_dir: str, user_id: str = None, data_file_extension: list = [], - allowed_filename_pattern: str = None, - allowed_streamname_pattern: str = None, + allowed_filename_pattern: str = None, allowed_streamname_pattern: str = None, + ignore_streamname_pattern: str = None, batch_size: int = None, compression: str = None, header: int = None, metadata: Metadata = None, metadata_parser: Callable = None, data_parser: Callable = None, gen_report: bool = False): @@ -326,7 +357,9 @@ def import_dir(cc_config: dict, input_data_dir: str, user_id: str = None, data_f input_data_dir (str): data directory path user_id (str): user id. Currently import_dir only supports parsing directory associated with a user data_file_extension (list[str]): (optional) provide file extensions (e.g., .doc) that must be ignored - allowed_filename_pattern (list[str]): (optional) regex of files that must be processed. + allowed_filename_pattern (str): (optional) regex of files that must be processed. + allowed_streamname_pattern (str): (optional) regex of stream-names to be processed only + ignore_streamname_pattern (str): (optional) regex of stream-names to be ignored during ingestion process batch_size (int): (optional) using this parameter will turn on spark parallelism. batch size is number of files each worker will process compression (str): pass compression name if csv files are compressed header (str): (optional) row number that must be used to name columns. None means file does not contain any header @@ -353,23 +386,26 @@ def import_dir(cc_config: dict, input_data_dir: str, user_id: str = None, data_f CC = Kernel(cc_config, enable_spark=enable_spark) cc_config = CC.config - if input_data_dir[:1] != "/": + if input_data_dir[-1:] != "/": input_data_dir = input_data_dir + "/" - processed_files_list = CC.SqlData.get_processed_files_list() + #processed_files_list = CC.SqlData.get_processed_files_list() for file_path in all_files: - if not file_path in processed_files_list: - if data_parser.__name__ == "mcerebrum_data_parser": + if not CC.SqlData.is_file_processed(file_path): + if 'mcerebrum' in data_parser.__name__: user_id = file_path.replace(input_data_dir, "")[:36] if batch_size is None: import_file(cc_config=cc_config, user_id=user_id, file_path=file_path, compression=compression, + allowed_streamname_pattern=allowed_streamname_pattern, ignore_streamname_pattern=ignore_streamname_pattern, header=header, metadata=metadata, metadata_parser=metadata_parser, data_parser=data_parser) else: if len(batch_files) > batch_size or tmp_user_id != user_id: rdd = CC.sparkContext.parallelize(batch_files) rdd.foreach(lambda file_path: import_file(cc_config=cc_config, user_id=user_id, file_path=file_path, + allowed_streamname_pattern=allowed_streamname_pattern, + ignore_streamname_pattern=ignore_streamname_pattern, compression=compression, header=header, metadata=metadata, metadata_parser=metadata_parser, data_parser=data_parser)) print("Total Files Processed:", len(batch_files)) @@ -384,7 +420,7 @@ def import_dir(cc_config: dict, input_data_dir: str, user_id: str = None, data_f rdd.foreach(lambda file_path: import_file(cc_config=cc_config, user_id=user_id, file_path=file_path, compression=compression, header=header, metadata=metadata, metadata_parser=metadata_parser, data_parser=data_parser)) - print("Total Files Processed:", len(batch_files)) + print("Last Batch\n","Total Files Processed:", len(batch_files)) if gen_report: print_stats_table(CC.SqlData.get_ingestion_stats()) diff --git a/cerebralcortex/data_importer/metadata_parsers/mcerebrum.py b/cerebralcortex/data_importer/metadata_parsers/mcerebrum.py index 9404ab4..f41db92 100644 --- a/cerebralcortex/data_importer/metadata_parsers/mcerebrum.py +++ b/cerebralcortex/data_importer/metadata_parsers/mcerebrum.py @@ -90,10 +90,10 @@ def get_platform_metadata(metadata: dict) -> Metadata: Returns: Metadata: Metadata class object """ - stream_name = metadata.get("name", "name_not_available") + stream_name = metadata.get("name", "") execution_context = metadata.get("execution_context") platform_metadata = execution_context.get("platform_metadata", {}) # dict - application_metadata = execution_context["application_metadata"] # dict + application_metadata = execution_context.get("application_metadata", {}) # dict wrist = "" if platform_metadata.get("name", "") != "": if "left" in stream_name: diff --git a/cerebralcortex/test_suite/test_dataframes.py b/cerebralcortex/test_suite/test_dataframes.py index 10c7a76..b65a286 100644 --- a/cerebralcortex/test_suite/test_dataframes.py +++ b/cerebralcortex/test_suite/test_dataframes.py @@ -1,31 +1,45 @@ from cerebralcortex.core.util.spark_helper import get_or_create_sc +from cerebralcortex.test_suite.util.data_helper import gen_phone_battery_data +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql import functions as F +from typing import List +import re + + lst1 = [ - [1, 10,20, 6], - [2,20,30, 7], - [3, 30,40,6], - [4,40,50,7], - [5,50,60,6] + [1, 202, 10,20, 6], + [2,202,20,30, 7], + [3,202, 30,40,6], + [4,203,40,50,7], + [5,203,50,80,6] ] lst2 = [ - [10, 1,2,3], - [12, 4,5,6], - [21, 23,45,12], - [35,12,22,77], - [42,3,1,7], - [48,12,44,22], - [58,6,4,2] + [10,202, 1,2,3], + [12, 202,4,5,6], + [21, 202,23,45,12], + [35,202,12,22,77], + [42,203,3,1,7], + [48,203,12,44,22], + [58,203,6,4,2] ] sqlContext = get_or_create_sc(type="sqlContext") -df1 = sqlContext.createDataFrame(lst1, schema=['id', 'st','et','val']) -df2 = sqlContext.createDataFrame(lst1, schema=['id', 'v1','v2','v3']) - -df1.show(truncate=False) +df1 = sqlContext.createDataFrame(lst1, schema=['id', 'usr', 'st','et','val']) +df2 = sqlContext.createDataFrame(lst2, schema=['id', 'usr', 'v1','v2','v3']) -df2.show(truncate=False) +# df1.show(truncate=False) +# +# df2.show(truncate=False) +# +# #df3 = df1.join(df2, [(df1.et <= df2.id) & (df1.st >= df2.id)]) +# dd = [str(df2.v2._jc),str(df1.et._jc)] +# df3=df1.join(df2, df2.id.between(F.col("st"), F.col("et"))) +# df3.show(truncate=False) -df3 = df1.join(df2, [(df1.et <= df2.id) & (df1.st >= df2.id)]) +@pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP +def mean_udf(v): + return v.mean() -df3.show(truncate=False) \ No newline at end of file +win = F.window("timestamp", windowDuration=windowDuration, slideDuration=slideDuration, startTime=startTime) \ No newline at end of file diff --git a/cerebralcortex/test_suite/test_main.py b/cerebralcortex/test_suite/test_main.py index 8d3c4d1..693d140 100755 --- a/cerebralcortex/test_suite/test_main.py +++ b/cerebralcortex/test_suite/test_main.py @@ -34,7 +34,7 @@ from cerebralcortex.test_suite.test_stream import DataStreamTest -class TestCerebralCortex(unittest.TestCase, DataStreamTest, SqlStorageTest, TestObjectStorage): +class TestCerebralCortex(unittest.TestCase, DataStreamTest):#, SqlStorageTest, TestObjectStorage): def setUp(self): """ diff --git a/cerebralcortex/test_suite/test_stream.py b/cerebralcortex/test_suite/test_stream.py index ea53c7c..62124a8 100644 --- a/cerebralcortex/test_suite/test_stream.py +++ b/cerebralcortex/test_suite/test_stream.py @@ -24,8 +24,9 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from datetime import datetime - +from pyspark.sql import functions as F from cerebralcortex.core.datatypes import DataStream +from cerebralcortex.core.metadata_manager.stream import Metadata from cerebralcortex.test_suite.util.data_helper import gen_phone_battery_data, gen_phone_battery_metadata @@ -44,57 +45,95 @@ def test_01_save_stream(self): self.assertEqual(result, True) - def test_02_stream(self): - all_streams = self.CC.list_streams() - searched_streams = self.CC.search_stream(stream_name="battery") - - self.assertEqual(len(all_streams),1) - self.assertEqual(all_streams[0].name,self.stream_name) - self.assertEqual(all_streams[0].metadata_hash,self.metadata_hash) - - self.assertEqual(len(searched_streams),2) - self.assertEqual(searched_streams[0],self.stream_name) - - def test_04_test_datafram_operations(self): + # def test_02_stream(self): + # all_streams = self.CC.list_streams() + # searched_streams = self.CC.search_stream(stream_name="battery") + # + # self.assertEqual(len(all_streams),1) + # self.assertEqual(all_streams[0].name,self.stream_name) + # self.assertEqual(all_streams[0].metadata_hash,self.metadata_hash) + # + # self.assertEqual(len(searched_streams),2) + # self.assertEqual(searched_streams[0],self.stream_name) + # + # def test_04_test_datafram_operations(self): + # ds = self.CC.get_stream(self.stream_name) + # avg_ds = ds.compute_average() + # data = avg_ds.collect() + # self.assertEqual(len(data),17) + # self.assertEqual(data[0][2],92.18333333333334) + # + # ds = self.CC.get_stream(self.stream_name) + # window_ds = ds.window() + # data = window_ds.collect() + # self.assertEqual(len(data),17) + # self.assertEqual(len(data[0][2]), 60) + + + + def test_05_map_window_to_stream(self): + def get_val(lst): + return lst[0] + sum_vals_udf = F.udf(get_val) ds = self.CC.get_stream(self.stream_name) - avg_ds = ds.compute_average() - data = avg_ds.collect() - self.assertEqual(len(data),17) - self.assertEqual(data[0][2],92.18333333333334) - - ds = self.CC.get_stream(self.stream_name) - window_ds = ds.window() - data = window_ds.collect() - self.assertEqual(len(data),17) - self.assertEqual(len(data[0][2]), 60) - - print("done") - def test_03_get_stream(self): - """ - Test functionality related to get a stream - - """ - ds = self.CC.get_stream(self.stream_name) - data = ds.data - metadata = ds.metadata[0] - - datapoint = data.take(1) - - self.assertEqual(datapoint[0][0], datetime(2019, 1, 9, 11, 49, 28)) - self.assertEqual(datapoint[0][1], 92) - self.assertEqual(datapoint[0][2], 1) - self.assertEqual(datapoint[0][3], self.user_id) - self.assertEqual(data.count(), 999) - - self.assertEqual(len(metadata.data_descriptor), 1) - self.assertEqual(len(metadata.modules), 1) - - self.assertEqual(metadata.metadata_hash, self.metadata_hash) - self.assertEqual(metadata.name, self.stream_name) - self.assertEqual(metadata.version, int(self.stream_version)) - self.assertEqual(metadata.data_descriptor[0].name, 'battery_level') - self.assertEqual(metadata.data_descriptor[0].type, 'LongType') - self.assertEqual(metadata.data_descriptor[0].attributes.get("description"), 'current battery charge') - self.assertEqual(metadata.modules[0].name, 'battery') - self.assertEqual(metadata.modules[0].version, '1.2.4') - self.assertEqual(metadata.modules[0].authors[0].get("test_user"), 'test_user@test_email.com') + win_ds = ds.window() + + # convert window stream as quality stream for next step + win_df=win_ds.data.withColumn("some_val", sum_vals_udf(win_ds.data["battery_level"])).drop("battery_level") + + from pyspark.sql.functions import pandas_udf, PandasUDFType + import pandas as pd + from pyspark.sql.types import StructField, StructType, StringType, FloatType + schema = StructType([ + StructField("mean", FloatType()), + StructField("val_1", FloatType()), + StructField("val_2", FloatType()) + ]) + @pandas_udf(schema, PandasUDFType.GROUPED_MAP) # doctest: +SKIP + def mean_udf(v): + print(v.dtypes) + all_cols = [99,23,1.3,2.5] + df = pd.DataFrame(all_cols, columns=['mean', 'val_1', 'val_2']) + return df + win_ds=DataStream(data=win_ds.data.drop("window"), metadata=Metadata()) + new_ds = win_ds.groupby("user").compute(mean_udf) + print(new_ds.data.columns) + sd = new_ds.collect() + df = win_df.withColumn("quality", F.when(win_df.some_val > 97, 1).otherwise(0)).drop("some_val") + win_quality_ds = DataStream(data=df, metadata=Metadata()) + + mapped_stream = ds.map_stream(win_quality_ds) + filtered_stream = mapped_stream.filter("quality", "=", 0) + bad_quality = filtered_stream.collect() + self.assertEqual(len(bad_quality.data), 710) + + + # def test_03_get_stream(self): + # """ + # Test functionality related to get a stream + # + # """ + # ds = self.CC.get_stream(self.stream_name) + # data = ds.data + # metadata = ds.metadata[0] + # + # datapoint = data.take(1) + # + # self.assertEqual(datapoint[0][0], datetime(2019, 1, 9, 11, 49, 28)) + # self.assertEqual(datapoint[0][1], 92) + # self.assertEqual(datapoint[0][2], 1) + # self.assertEqual(datapoint[0][3], self.user_id) + # self.assertEqual(data.count(), 999) + # + # self.assertEqual(len(metadata.data_descriptor), 1) + # self.assertEqual(len(metadata.modules), 1) + # + # self.assertEqual(metadata.metadata_hash, self.metadata_hash) + # self.assertEqual(metadata.name, self.stream_name) + # self.assertEqual(metadata.version, int(self.stream_version)) + # self.assertEqual(metadata.data_descriptor[0].name, 'battery_level') + # self.assertEqual(metadata.data_descriptor[0].type, 'LongType') + # self.assertEqual(metadata.data_descriptor[0].attributes.get("description"), 'current battery charge') + # self.assertEqual(metadata.modules[0].name, 'battery') + # self.assertEqual(metadata.modules[0].version, '1.2.4') + # self.assertEqual(metadata.modules[0].authors[0].get("test_user"), 'test_user@test_email.com') diff --git a/conf/cerebralcortex.yml b/conf/cerebralcortex.yml index 9d9b9cc..9b4b1a3 100644 --- a/conf/cerebralcortex.yml +++ b/conf/cerebralcortex.yml @@ -6,20 +6,21 @@ cc: throw_exception: False #if this is set to True then application will get exceptions auth_token_expire_time: 40000 #seconds auth_encryption_key: "md@k" + log_files_path: "" ######################################################################### # NoSQL Storage # ######################################################################### -nosql_storage: filesystem +nosql_storage: hdfs filesystem: # in case of nosql_store=filesystem, provide directory path where all processed-data shall be stored - filesystem_path: "/home/ali/IdeaProjects/MD2K_DATA/tmp/" + filesystem_path: "/home/ali/IdeaProjects/MD2K_DATA/rice_data/processed/" -#hdfs: -# host: localhost -# port: 9001 -# raw_files_dir: "cerebralcortex/" +hdfs: + host: localhost + port: 9001 + raw_files_dir: "/user/ali/cc33/" ######################################################################### # OBJECT Storage # @@ -28,6 +29,15 @@ filesystem: object_storage: object_storage_path: "/home/ali/IdeaProjects/MD2K_DATA/tmp/" +#minio: # AWS-S3 UPDATE +# host: s3.amazonaws.com # for amazon pass s3.amazonaws.com and for minio simpley pass url of minio server +# port: 9000 +# access_key: '' +# secret_key: '' +# input_bucket_name: 'cerebralcortex-mperf' # required for aws-s3 +# output_bucket_name: 'cerebralcortex-mperf-output' # required for aws-s3 +# dir_prefix: 'cerebralcortex/data/' +# secure: False #ssl ######################################################################### # Relational Storage # @@ -47,23 +57,34 @@ mysql: ######################################################################### ###################### Visualization Data Storage ####################### ######################################################################### -visualization_storage: influxdb +visualization_storage: none -influxdb: - host: 127.0.0.1 - port: 8086 - database: cerebralcortex_raw - db_user: "" - db_pass: "" +#influxdb: +# host: 127.0.0.1 +# port: 8086 +# database: cerebralcortex_raw +# db_user: "" +# db_pass: "" ######################################################################### ########################### Messaging Service ########################### ######################################################################### -messaging_service: kafka +messaging_service: none -kafka: - host: 127.0.0.1 - port: 9092 - # ping__kafka: how often CC-kafka shall check for new messages (in seconds) - ping_kafka: 5 - consumer_group_id: "md2k-data-ingestion" +#kafka: +# host: 127.0.0.1 +# port: 9092 +# ping_kafka: 5 +# consumer_group_id: "md2k" +######################################################################### +# Data Provenance Storage # +######################################################################### +provenance: none + +mprove: +# host: 127.0.0.1 # change if MySQL is installed remotely +# port: 8088 # Change if MySQL is not using default port +# username: 'sample' # Change +# pass: 'default' # Change +# namespace: 'http://mprov.md2k.org' +# graph_name: 'mProv-graph' diff --git a/requirements.txt b/requirements.txt index b3ff71a..6654048 100755 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ influxdb==5.2.1 pyarrow==0.11.1 pympler==0.5 hdfs3==0.3.0 -pyspark==2.4.0 +pyspark==2.4.3 msgpack==0.6.1 PyJWT==1.7.1 pandas==0.24.1 @@ -16,4 +16,8 @@ texttable numpy==1.16.1 geopy==1.18.1 Shapely==1.6.4.post2 -scikit-learn==0.20.2 \ No newline at end of file +scikit-learn==0.20.2 +plotly==3.10.0 +matplotlib +cufflinks==0.16 +ipyleaflet \ No newline at end of file diff --git a/setup.py b/setup.py index e6c9a38..6e53855 100755 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ setup( name="cerebralcortex-kernel", - version='3.0.0r22', + version='3.0.0r26', description='Backend data analytics platform for MD2K software', long_description=long_description,