-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMongoGliderDataExporter.cpp
139 lines (114 loc) · 4.54 KB
/
MongoGliderDataExporter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include "MongoGliderDataExporter.hpp"
#include <memory>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <boost/algorithm/string.hpp>
MongoGliderDataExporter::MongoGliderDataExporter(string _hostString, string _gliderName) : hostString(_hostString), gliderName(_gliderName){
}
MongoGliderDataExporter::~MongoGliderDataExporter(){
}
// ScopedDbConnection in Mongo is wonky. Just going to connect in processReading function
// May want to change this later...
void MongoGliderDataExporter::open(string flightFileType, vector < pair<string,string> >& flightHeaders, string scienceFileType, vector < pair<string,string> >& scienceHeaders){
fliFileType = flightFileType;
sciFileType = scienceFileType;
fprintf(stdout,"Initializing Mongo Connection\r\n");
db = new DBClientConnection();
string errMsg;
try{
db->connect(hostString,errMsg);
} catch (DBException &e){
fprintf(stderr,"Database connection exception: %s. Message: %s\r\n",e.what(),errMsg.c_str());
}
currentTime = time(NULL);
}
void MongoGliderDataExporter::close(){
fprintf(stdout,"Closing Mongo Connection\r\n");
delete db;
}
void pullOutGPS(BSONObjBuilder& b, map<string,double>& readings,string gpsField){
if(readings.count(gpsField) > 0){
size_t pos = gpsField.find("lon") != string::npos ? gpsField.find("lon") : gpsField.find("lat");
string fieldBase = gpsField.substr(0,pos);
BSONArrayBuilder gps(2);
gps.append(readings[fieldBase+"lon-lon"]);
gps.append(readings[fieldBase+"lat-lat"]);
b.append(fieldBase+"lonlat",gps.done());
readings.erase(fieldBase+"lon-lon");
readings.erase(fieldBase+"lat-lat");
}
}
void MongoGliderDataExporter::processReading(map<string,double>& readings){
BSONObjBuilder b;
// Pull out times
double timestamp = 0;
if(readings.count("m_present_time-timestamp") > 0){
timestamp = readings["m_present_time-timestamp"];
readings.erase("m_present_time-timestamp");
}
if(readings.count("sci_m_present_time-timestamp") > 0){
timestamp = readings["sci_m_present_time-timestamp"];
readings.erase("sci_m_present_time-timestamp");
}
b.appendTimeT("timestamp",(time_t)timestamp);
string gpsFields[3] = {"m_gps_lon-lon","m_lon-lon","c_wpt_lon-lon"};
for(int i=0; i < 3; i++){
pullOutGPS(b,readings,gpsFields[i]);
}
map<string,double>::iterator it;
for(it=readings.begin(); it != readings.end(); it++){
b.append((*it).first,(*it).second);
}
try{
db->insert("GDAM."+gliderName+"."+fliFileType+sciFileType,b.obj());
} catch (DBException &e) {
fprintf(stderr,"Unable to insert value: %s.\r\n",e.what());
}
}
void MongoGliderDataExporter::filesProcessed(set<string>& fileNames){
set<string>::iterator it;
for(it=fileNames.begin(); it != fileNames.end(); it++){
struct stat fileDetails;
BSONObjBuilder b;
if(stat((*it).c_str(),&fileDetails) == 0){
b.appendTimeT("date_processed",(long long int)this->currentTime);
b.appendTimeT("access_time",fileDetails.st_atime);
b.appendTimeT("modify_time",fileDetails.st_mtime);
b.appendTimeT("create_time",fileDetails.st_ctime);
b.appendTimeT("file_size",fileDetails.st_size);
size_t lastSlash = (*it).rfind("/");
string name = (*it);
if(lastSlash != string::npos){
name = (*it).substr(lastSlash+1);
}
b.append("name",name);
try{
db->insert("GDAM."+gliderName+".processed_files",b.obj());
} catch (DBException &e){
fprintf(stderr,"Unable to insert value: %s.\r\n",e.what());
}
} else {
fprintf(stderr,"Cannot stat file at %s.\r\n",(*it).c_str());
}
}
}
time_t MongoGliderDataExporter::getLastTime(){
DBClientConnection* timeConn = new DBClientConnection();
string errMsg;
try{
timeConn->connect(hostString,errMsg);
} catch (DBException &e){
fprintf(stderr,"Database connection exception: %s. Message: %s\r\n",e.what(),errMsg.c_str());
}
Query qu = BSONObj();
auto_ptr<DBClientCursor> cursor = timeConn->query("GDAM."+gliderName+".processed_files",qu.sort("modify_time",-1));
time_t retVal = 0;
while(cursor->more()){
BSONObj p = cursor->next();
retVal = p.getField("modify_time").Date().toTimeT();
break; // Only want first result
}
delete timeConn;
return retVal;
}