Stream Processor is a library and data processing tool that contains online implementations of algorithms designed to run on mCerebrum. This codebase also be run in a standalone fashion on most computing platforms.
It contains implementations of the following algorithms:
- cStress: A continuous stress assessment algorithm
- UbiComp 2015 cStress: Towards a Gold Standard for Continuous Stress Assessment in the Mobile Environment Karen Hovsepian, Mustafa al'absi, Emre Ertin, Thomas Kamarck, Motoshiro Nakajima, Santosh Kumar pdf
Clone repository git clone https://github.com/MD2Korg/stream-processor
or import into Intellij IDEA through New->Project from Version Control->Github
- Use this url
https://github.com/MD2Korg/stream-processor
- Check
Use auto-import
- Check
Create directories ofr empyt content roots automatically
- Choose
Use gralde wrapper task configuration
- Specify a Gradle JVM
(jdk>=1.7)
- Wait for Gradle to resolve dependencies and build project
- Define the
Project SDK
and add the same JDK you are utilizing from the previous step - Open
Edit Configurations
- Add
Application
- Main class:
CC_Main or Main
- Program Arguments:
- Directory to Cerebral Cortex data files
- Path to
cStressModelV5.json
- Path to
cStressModelRIPv4.json
- Path to
model_puffmarker.json
- Specify classpath of module:
streamm-processor_main
- Main class:
Import data for replay through Stream Processor
CSVParser tp = new CSVParser();
tp.importData(path + "/rip.txt", AUTOSENSE.CHEST_RIP);
tp.importData(path + "/ecg.txt", AUTOSENSE.CHEST_ECG);
tp.importData(path + "/accelx.txt", AUTOSENSE.CHEST_ACCEL_X);
tp.importData(path + "/accely.txt", AUTOSENSE.CHEST_ACCEL_Y);
tp.importData(path + "/accelz.txt", AUTOSENSE.CHEST_ACCEL_Z);
tp.sort();
Setup the Stream Processor object with a 60 second window, define a path for exporting data streams, and load the cStress model file.
int windowSize = 60000;
StreamProcessor streamProcessor = new StreamProcessor(windowSize);
streamProcessor.setPath(path);
streamProcessor.loadModel(cStressModelPath);
Define datapoint callbacks for DataPoint
and DataPointArray
.
streamProcessor.dpInterface = new DataPointInterface() {
@Override
public void dataPointHandler(String stream, DataPoint dp) {
System.out.println(path + "/" + stream + " " + dp);
}
@Override
public void dataPointArrayHandler(String stream, DataPointArray dp) {
System.out.println(path + "/" + stream + " " + dp);
}
};
Register callbacks for particular named data streams.
streamProcessor.registerCallbackDataArrayStream(StreamConstants.ORG_MD2K_CSTRESS_FV);
streamProcessor.registerCallbackDataStream(StreamConstants.ORG_MD2K_CSTRESS_DATA_ACCEL_ACTIVITY);
streamProcessor.registerCallbackDataStream(StreamConstants.ORG_MD2K_CSTRESS_PROBABILITY);
streamProcessor.registerCallbackDataStream(StreamConstants.ORG_MD2K_CSTRESS_STRESSLABEL);
Replay logic for sending data through Stream Processor and processing windows streamProcessor.go()
.
long windowStartTime = -1;
long st = -1;
int count = 0;
for (CSVDataPoint ap : tp) {
DataPoint dp = new DataPoint(ap.timestamp, ap.value);
if (windowStartTime < 0) {
windowStartTime = Time.nextEpochTimestamp(dp.timestamp, windowSize);
st = System.currentTimeMillis();
}
if ((dp.timestamp - windowStartTime) >= windowSize) { //Process the buffer every windowSize milliseconds
streamProcessor.go();
windowStartTime += windowSize;
}
streamProcessor.add(ap.channel, dp);
}
- 0.1.0 Initial release
- Timothy Hnat (twhnat) [email protected]
- Karen Hovsepian (karoaper) [email protected]
- Hillol Sarker (hillolsarker) [email protected]
MD2K is supported by the National Institutes of Health Big Data to Knowledge Initiative Grant #1U54EB020404
Team: Cornell Tech, GA Tech, U Memphis, Northwestern, Ohio State, Open mHealth, Rice, UCLA, UCSD, UCSF, U Mass, U Michigan, WVU