-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathCSI300.py
75 lines (60 loc) · 2.88 KB
/
CSI300.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import tushare as ts
from FakePriceDataLoader import FakePriceDataLoaderChina
from PriceData import PriceData
from SimpleMovingAverage import SimpleMovingAverage
import PriceDataLoader
import time
import threading
from datetime import datetime
class CSI300(object):
CSI300_CODE_IFENG = "sh000300"
def __init__(self, realtime_polling_interval = 20):
start_datetime = datetime(2017, 12, 27, 14, 49, 59)
csv_base_name = "csi300"
# Fake data source to debug realtime data updating and SMA calculation.
#PriceDataLoader.price_data_loader_china = FakePriceDataLoaderChina(start_datetime, csv_base_name)
self.price_data = PriceData(CSI300.CSI300_CODE_IFENG, csv_base_name)
self.price_data.set_simple_moving_average(SimpleMovingAverage(SimpleMovingAverage.DEFAULT_PARAMS))
self.realtime_polling_interval = realtime_polling_interval
self.start_realtime_polling()
self.start_histdata_polling()
while True:
try:
self.realtime_polling_thread.join(timeout = 5)
self.histdata_polling_thread.join(timeout = 5)
if not self.realtime_polling_thread.is_alive() or not self.histdata_polling_thread.is_alive():
print("At least one of the polling thread finished.")
self.stop_realtime_polling()
self.stop_histdata_polling()
break
except (KeyboardInterrupt, SystemExit):
print("Ctrl-C or SystemExit.")
self.stop_realtime_polling()
self.stop_histdata_polling()
def start_realtime_polling(self):
self.realtime_polling = True
self.realtime_polling_thread = threading.Thread(target = self.realtime_price_polling)
self.realtime_polling_thread.start()
def stop_realtime_polling(self):
self.realtime_polling = False
# TODO: In order to quit ASAP we need to signal the thread to force sleep to finish.
# However we don't care too much about it now.
def start_histdata_polling(self):
self.histdata_polling = True
self.histdata_polling_thread = threading.Thread(target = self.history_price_data_polling)
self.histdata_polling_thread.start()
def stop_histdata_polling(self):
self.histdata_polling = False
# TODO: same as above.
def realtime_price_polling(self):
while self.realtime_polling:
self.price_data.update_realtime()
self.price_data.print_realtime_price_summary()
self.price_data.print_realtime_sma_summary()
time.sleep(self.realtime_polling_interval)
def history_price_data_polling(self):
while self.histdata_polling:
time.sleep(1)
nowChina = PriceDataLoader.price_data_loader_china.now()
self.price_data.update_hist_data(nowChina)
# bug: update data for day