-
Notifications
You must be signed in to change notification settings - Fork 77
/
taxiStream.dos
136 lines (128 loc) · 8.52 KB
/
taxiStream.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/******** 流数据预测 ********/
// 需要先运行 taxiTrain.dos 完成数据落盘及模型训练
undef(all)
clearAllCache()
go;
unsubscribeTable(, `orderTable, `saveToDisk)
unsubscribeTable(, `traitTable, `predict)
unsubscribeTable(, `orderTable, `orderProcess)
try{ dropStreamTable(`orderTable) } catch(ex) { print(ex) }
try{ dropStreamTable(`traitTable) } catch(ex) { print(ex) }
try{ dropStreamTable(`predictTable) } catch(ex) { print(ex) }
go;
// 创建流表,模拟接受订单行程数据
orderColName = `id`vendor_id`pickup_datetime`passenger_count`pickup_longitude`pickup_latitude`dropoff_longitude`dropoff_latitude`store_and_fwd_flag
orderColType = `SYMBOL`INT`DATETIME`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`CHAR
traitColName = `id`vendor_id`pickup_datetime`passenger_count`pickup_longitude`pickup_latitude`dropoff_longitude`dropoff_latitude`store_and_fwd_flag`store_and_fwd_flag_int`pickup_weekday`pickup_hour_weekofyear`pickup_hour`pickup_minute`pickup_week_hour`pca_testpick_0`pca_testpick_1`pca_testdrop_0`pca_testdrop_1`pickup_cluster`dropoff_cluster`distance_haversine`distance_dummy_manhattan`direction`pca_manhattan`center_latitude`center_longitude`avg_speed_h_gby_pickup_hour`avg_speed_m_gby_pickup_hour`log_trip_duration_gby_pickup_hour`avg_speed_h_gby_pickup_date`avg_speed_m_gby_pickup_date`log_trip_duration_gby_pickup_date`avg_speed_h_gby_pickup_week_hour`avg_speed_m_gby_pickup_week_hour`log_trip_duration_gby_pickup_week_hour`avg_speed_h_gby_pickup_cluster`avg_speed_m_gby_pickup_cluster`log_trip_duration_gby_pickup_cluster`avg_speed_h_gby_dropoff_cluster`avg_speed_m_gby_dropoff_cluster`log_trip_duration_gby_dropoff_cluster
traitColType = `SYMBOL`INT`DATETIME`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`CHAR`INT`INT`INT`INT`INT`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
predictColName = `id`vendor_id`pickup_datetime`passenger_count`pickup_longitude`pickup_latitude`dropoff_longitude`dropoff_latitude`store_and_fwd_flag`duration
predictColType = `SYMBOL`INT`DATETIME`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`CHAR`DOUBLE
// 订单信息表
share streamTable(100000:0, orderColName, orderColType) as orderTable
// 特征表
share streamTable(100000:0, traitColName, traitColType) as traitTable
// 预测结果表
share streamTable(100000:0, predictColName, predictColType) as predictTable
// 两个经纬度之间的haversine距离计算
def haversine_array(lat1_, lng1_, lat2_, lng2_) {
lat1, lng1, lat2, lng2 = double(deg2rad([lat1_, lng1_, lat2_, lng2_]))
// print(lat1, lng1, lat2, lng2)
AVG_EARTH_RADIUS = 6371 // in km
lat = lat2 - lat1
lng = lng2 - lng1
d = sin(lat * 0.5) * sin(lat * 0.5) + cos(lat1) * cos(lat2) * sin(lng * 0.5) * sin(lng * 0.5)
h = 2 * AVG_EARTH_RADIUS * asin(sqrt(d))
return h
}
// 两个经纬度之间的Manhattan距离计算
def dummy_manhattan_distance(lat1, lng1, lat2, lng2) {
a = haversine_array(lat1, lng1, lat1, lng2) // latitude 方向
b = haversine_array(lat1, lng1, lat2, lng1) // longitude 方向
return a + b
}
// 两个经纬度之间的方位信息
def bearing_array(lat1_, lng1_, lat2_, lng2_) {
AVG_EARTH_RADIUS = 6371 // in km
lng_delta_rad = deg2rad(lng2_ - lng1_)
lat1, lng1, lat2, lng2 = deg2rad([lat1_, lng1_, lat2_, lng2_])
y = sin(lng_delta_rad) * cos(lat2)
x = cos(lat1) * sin(lat2) - sin(lat1) * cos(lat2) * cos(lng_delta_rad)
return rad2deg(atan(y/x))
}
// 数据处理和特征生成,流表将使用该函数处理订阅的数据
def process(mutable traitTable, mutable hisData, msg) {
t = msg
sz = size(msg)
pca_component = loadText("./taxidata/PCA.model")
kmeans_model = loadModel("./taxidata/KMeans.model")
t[`store_and_fwd_flag_int] = iif(t[`store_and_fwd_flag] == 'N', int(0), int(1))
t['pickup_date'] = date(t['pickup_datetime'])
t['pickup_weekday'] = weekday(t['pickup_datetime'])
t['pickup_hour_weekofyear'] = weekOfYear(t['pickup_datetime'])
t['pickup_hour'] = hour(t['pickup_datetime'])
t['pickup_minute'] = int(minute(t['pickup_datetime']))
// t['pickup_dt'] = int(second((t['pickup_datetime'] - datetime(2016.01.01))))
t['pickup_week_hour'] = t['pickup_weekday'] * 24 + t['pickup_hour']
pca_component = matrix(pca_component)
avg_pick_pos = matrix(avg(table(hisData[`pickup_latitude] as latitude, hisData[`pickup_longitude] as longitude)))
avg_drop_pos = matrix(avg(table(hisData[`dropoff_latitude] as latitude, hisData[`dropoff_longitude] as longitude)))
pca_pick = dot((matrix(table(t[`pickup_latitude] as latitude, t[`pickup_longitude] as longitude)) - repmat(avg_pick_pos, sz, 1)), pca_component)
pca_drop = dot((matrix(table(t[`dropoff_latitude] as latitude, t[`dropoff_longitude] as longitude)) - repmat(avg_drop_pos, sz, 1)), pca_component)
t[`pca_testpick_0] = flatten(pca_pick[:, 0])
t[`pca_testpick_1] = flatten(pca_pick[:, 1])
t[`pca_testdrop_0] = flatten(pca_drop[:, 0])
t[`pca_testdrop_1] = flatten(pca_drop[:, 1])
t['pickup_cluster'] = kmeans_model.predict(select pickup_latitude, pickup_longitude from t)
t['dropoff_cluster'] = kmeans_model.predict(select dropoff_latitude, dropoff_longitude from t)
t['distance_haversine'] = haversine_array(t['pickup_latitude'], t['pickup_longitude'], t['dropoff_latitude'], t['dropoff_longitude'])
t['distance_dummy_manhattan'] = dummy_manhattan_distance(t['pickup_latitude'], t['pickup_longitude'], t['dropoff_latitude'], t['dropoff_longitude'])
t['direction'] = bearing_array(t['pickup_latitude'], t['pickup_longitude'], t['dropoff_latitude'], t['dropoff_longitude'])
t['pca_manhattan'] = abs(t['pca_testdrop_1'] - t['pca_testpick_1']) + abs(t['pca_testdrop_0'] - t['pca_testpick_0'])
t['center_latitude'] = (t['pickup_latitude'] + t['dropoff_latitude']) / 2
t['center_longitude'] = (t['pickup_longitude'] + t['dropoff_longitude']) / 2
// t['pickup_dt_bin'] = (t['pickup_dt'] / (3 * 3600))
hisData.schema().colDefs
gby = t[`id]
for(gby_col in ['pickup_hour', 'pickup_date', 'pickup_week_hour', 'pickup_cluster', 'dropoff_cluster']) {
for(gby_para in ['avg_speed_h', 'avg_speed_m', 'log_trip_duration']) {
gby = groupby(avg, hisData[gby_para], hisData[gby_col])
gby.rename!(`avg_ + gby_para, gby_para + '_gby_' + gby_col)
t = lsj(t, gby, gby_col)
}
}
t.dropColumns!(`pickup_date)
traitTable.append!(t)
return traitTable
}
// 订阅订单表,使用 PCA KMeans 等构建新特征,输出特征表
// 历史数据
hisData = loadTable("dfs://taxi", `traitData)
hisData = select * from hisData
subscribeTable(tableName="orderTable", actionName="orderProcess", offset=0, handler=process{traitTable, hisData}, msgAsTable=true, batchSize=1, throttle=1, hash=0, reconnect=true)
// 使用 XGBoost 预测行程时间
def predictDuration(mutable predictTable, msg) {
x = msg
feature = select id, pickup_datetime, store_and_fwd_flag from x
for(col in ['id', 'pickup_datetime', 'store_and_fwd_flag']) {
x.dropColumns!(col)
}
xgboost_model = xgboost::loadModel("./taxidata/XGBoost.model")
y = xgboost::predict(xgboost_model, x)
for(col in [`id, 'pickup_datetime', 'store_and_fwd_flag']) {
x[col] = feature[col]
}
predict = select id, vendor_id, pickup_datetime, passenger_count ,pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, store_and_fwd_flag, y as duration from x
predictTable.append!(predict)
return predictTable
}
// 订阅特征流表,使用 XGBoost 预测模型,输出行程时间预测值
subscribeTable(tableName="traitTable", actionName="predict", offset=0, handler=predictDuration{predictTable}, msgAsTable=true, hash=1, reconnect=true)
// 回放数据,模拟实时产生数据
data = loadTable("dfs://taxi", `testData)
data = select * from data
submitJob("replay", "trade", replay{inputTables=data, outputTables=orderTable, dateColumn=`pickup_datetime, timeColumn=`pickup_datetime, replayRate=25, absoluteRate=true, parallelLevel=1})
// 实时订单数据落盘
db = database("dfs://taxi")
if(existsTable("dfs://taxi", "newData")) { dropTable(db, "newData") }
db.createPartitionedTable(table=table(1:0, orderTable.schema().colDefs.name, orderTable.schema().colDefs.typeString), tableName=`newData, partitionColumns=`pickup_datetime, sortColumns=`pickup_datetime, compressMethods={datetime:"delta"})
subscribeTable(tableName="orderTable", actionName="saveToDisk", offset=0, handler=loadTable("dfs://taxi", "newData"), msgAsTable=true, batchSize=100000, throttle=1, reconnect=true)