Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

faster #40

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dev-resources/notes.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env clj

@mbp2:dev-resources $ clj
Clojure 1.10.0
user=> (def mstar [{:dataset "mstar" :ticker "abc"} {:dataset "mstar" :ticker "ghf"}])
#'user/mstar
user=> (def tingo [{:dataset "tingo" :ticker "xyz"} {:dataset "tingo" :ticker "abc"}])
#'user/tingo
user=> (map vector mstar tingo)
([{:dataset "mstar", :ticker "abc"} {:dataset "tingo", :ticker "xyz"}] [{:dataset "mstar", :ticker "ghf"} {:dataset "tingo", :ticker "abc"}])
184 changes: 184 additions & 0 deletions dev-resources/splitter.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#!/usr/bin/env clj

(require '[clojure.pprint :as pprint])

(defn print-it [coll]
(pprint/pprint coll)
coll)

(def data [{:dataset "ALPHA-VANTAGE",
:ticker
["BRK-B"
"LON:FCH"
"SFTBF"
"TSM"
"NTDOY"
"NTDOF"
"VEMAX"
"VEURX"
"VEXPX"
"VGWAX"
"VITAX"
"VIMAX"
"VMRAX"
"VPACX"
"VGSLX"
"VTIAX"
"VTSAX"
"VWINX"
"VWENX"
"VWNDX"
"VMMXX"
"VWIGX"
"VINEX"
"VMMSX"
"VWUAX"
"VEXPX"
"VIMSX"
"VMRGX"
"VGTSX"
"VFH"
"VEA"
"VWO"
"VHT"
"VGT"
"IAU"
"GLD"
"FB"
"AMZN"
"GOOG"
"NVDA"
"CY"
"INTC"
"TXN"
"V"
"SAP"
"SQ"
"PYPL"
"BRK.B"
"AAPL"
"TWTR"
"GS"
"INTU"]}
{:dataset "TIINGO",
:ticker
["BRK-B"
"FB"
"AMZN"
"GOOG"
"NVDA"
"CY"
"INTC"
"TXN"
"V"
"SAP"
"SQ"
"PYPL"
"AAPL"
"TWTR"
"GS"
"INTU"
"LON:FCH"
"SFTBF"
"TSM"
"NTDOY"
"NTDOF"
"VFH"
"VEA"
"VWO"
"VHT"
"VGT"
"IAU"
"GLD"
"VEMAX"
"VEURX"
"VEXPX"
"VGWAX"
"VITAX"
"VIMAX"
"VMRAX"
"VPACX"
"VGSLX"
"VTIAX"
"VTSAX"
"VWINX"
"VWENX"
"VWNDX"
"VMMXX"
"VWIGX"
"VINEX"
"VMMSX"
"VWUAX"
"VEXPX"
"VIMSX"
"VMRGX"
"VGTSX"]}
{:dataset "MSTAR",
:ticker
["FB"
"AMZN"
"GOOG"
"NVDA"
"CY"
"INTC"
"TXN"
"V"
"SAP"
"SQ"
"PYPL"
"BRK.B"
"AAPL"
"TWTR"
"GS"
"INTU"
"LON:FCH"
"SFTBF"
"TSM"
"NTDOY"
"NTDOF"
"VFH"
"VEA"
"VWO"
"VHT"
"VGT"
"IAU"
"GLD"
"VEMAX"
"VEURX"
"VEXPX"
"VGWAX"
"VITAX"
"VIMAX"
"VMRAX"
"VPACX"
"VGSLX"
"VTIAX"
"VTSAX"
"VWINX"
"VWENX"
"VWNDX"
"VMMXX"
"VWIGX"
"VINEX"
"VMMSX"
"VWUAX"
"VEXPX"
"VIMSX"
"VMRGX"
"VGTSX"]}])

(defn split-it [m]
(let [d (:dataset m)]
(->> m
:ticker
(mapv #(assoc {} :dataset d :ticker %))
#_print-it)))

(->> data
(mapcat split-it)
;vec
;(map vector)
;(partition-all 2 )
;flatten
doall
print-it)
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
:dependencies [[org.clojure/clojure "1.10.0"]
[clj-http "3.9.1"]
[clj-time "0.15.1"]
[com.climate/claypoole "1.1.4"]
[environ "1.1.0"]
[org.clojure/data.json "0.2.6"]
[org.clojure/java.jdbc "0.7.9"]
Expand Down
35 changes: 30 additions & 5 deletions src/jobs/equities.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
(:require [clj-time.coerce :as coerce]
[clj-time.core :as time]
[clj-time.format :as format]
[clojure.core.reducers :as r]
[com.climate.claypoole :as cp]
[clojure.data.json :as json]
[clojure.java.jdbc :as jdbc]
[clojure.tools.cli :as cli]
Expand Down Expand Up @@ -310,9 +312,32 @@
:date
util/joda-date->date-str)}
query-params)
data (->> (concat alpha-vantage tiingo morningstar quandl)
(map #(api/get-data % query-params*))
flatten)]
(execute! cxn data)))
r-data (fn []
(->> (concat #_alpha-vantage tiingo morningstar quandl)
vec
(r/map (fn [m] (let [d (:dataset m)]
(->> m
:ticker
(map #(assoc {} :dataset d :ticker %))))))
r/flatten
(r/map #(api/get-data % query-params*))
r/flatten
r/foldcat))
data (cp/with-shutdown! [pool (cp/threadpool 2)]
(->> (concat #_alpha-vantage tiingo morningstar #_quandl)
vec
(cp/pmap pool (fn [m] (let [d (:dataset m)]
(->> m
:ticker
(map #(assoc {} :dataset d :ticker %))))))
flatten
(cp/pmap pool #(api/get-data % query-params*))
(map #((> 2 (count %))))
util/print-it
flatten
doall
))]
(util/print-it data)
#_(execute! cxn data)))

(util/notify-healthchecks-io (-> :healthchecks-io-api-key env)))
#_(util/notify-healthchecks-io (-> :healthchecks-io-api-key env)))
55 changes: 26 additions & 29 deletions src/markets_etl/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
(query-alpha-vantage-api! ticker {}))
([url ticker paramz]
{:pre [(every? true? (allowed? paramz))]}
(log/info "query-alpha-vantage! called")
(Thread/sleep 5500)
(let [params (dissoc paramz :limit)
response (try
Expand Down Expand Up @@ -107,6 +108,7 @@
(query-tiingo! ticker {}))
([ticker paramz]
{:pre [(every? true? (allowed? paramz))]}
(log/info "query-tiingo! called")
(let [params (dissoc paramz :limit)
url (str (:protocol tiingo-api)
(:url tiingo-api)
Expand All @@ -133,7 +135,8 @@
(if (= 200 status)
(-> body
(json/read-str :key-fn (comp keyword string/lower-case)))
(log/error "Tiingo request, status:" status "Ticker:" ticker)))))
(do (log/error "Tiingo request, status:" status "Ticker:" ticker)
nil)))))

(defn query-intrinio! ;; turning this off for now - as of Dec 2018 need to
([ticker] ;; pay for any data now
Expand Down Expand Up @@ -163,13 +166,15 @@
(if (= 200 status)
(-> body
(json/read-str :key-fn keyword))
(log/error "Intrinio request, exception:" status "Ticker:" ticker)))))
(do (log/error "Intrinio request, exception:" status "Ticker:" ticker)
nil)))))

(defn query-morningstar!
([ticker]
(query-morningstar! ticker {}))
([ticker paramz]
{:pre [(every? true? (allowed? paramz))]}
(log/info "query-morningstar! called")
(let [params (dissoc paramz :limit)
url (str (:protocol morningstar-api)
(:url morningstar-api)
Expand All @@ -196,14 +201,16 @@
(if (and (= 200 status) ((comp not empty?) body'))
(-> body'
(json/read-str :key-fn (comp keyword string/lower-case)))
(log/error "Morningstar-api request, exception:" status
"Ticker:" ticker)))))
(do (log/error "Morningstar-api request, exception:" status
"Ticker:" ticker)
nil)))))

(defn query-quandl!
([dataset ticker]
(query-quandl! dataset ticker {}))
([dataset ticker paramz]
{:pre [(every? true? (allowed? paramz))]}
(log/info "query-quandl! called")
(let [url (str (:protocol quandl-api)
(:url quandl-api)
(str dataset "/")
Expand Down Expand Up @@ -235,28 +242,22 @@
(defmethod get-data "TIINGO" [{:keys [dataset
ticker]}
query-params]
(->> ticker
(map (fn [tkr]
(->> (query-tiingo! tkr
query-params)
(map #(assoc % :dataset dataset :ticker tkr)))))))
(->> (query-tiingo! ticker
query-params)
(map #(assoc % :dataset dataset :ticker ticker))))

(defmethod get-data "MSTAR" [{:keys [dataset
ticker]}
query-params]
(->> ticker
(map (fn [tkr]
(-> (query-morningstar! tkr
query-params)
(assoc :dataset dataset :ticker tkr))))))
(-> (query-morningstar! ticker
query-params)
(assoc :dataset dataset :ticker ticker)))

(defmethod get-data "INTRINIO" [{:keys [dataset
ticker]}
query-params]
(->> ticker
(map (fn [tkr]
(-> (query-intrinio! tkr query-params)
(assoc :dataset dataset :ticker tkr))))))
(-> (query-intrinio! ticker query-params)
(assoc :dataset dataset :ticker ticker)))

(defmethod get-data "ALPHA-VANTAGE" [{:keys [dataset
ticker]}
Expand All @@ -266,17 +267,13 @@
alpha-vantage-dataset (if (clojure.set/subset? ticker' currencies)
:currency
:equities)]
(->> ticker
(map (fn [tkr]
(-> (query-alpha-vantage! {:endpoint alpha-vantage-dataset
:ticker tkr
:query-params query-params})
(assoc :dataset dataset :ticker tkr)))))))
(-> (query-alpha-vantage! {:endpoint alpha-vantage-dataset
:ticker ticker
:query-params query-params})
(assoc :dataset dataset :ticker ticker))))

(defmethod get-data :default [{:keys [dataset
ticker] :as m}
ticker]}
query-params]
(->> ticker
(map (fn [tkr]
(-> (query-quandl! dataset tkr query-params)
(assoc :dataset dataset :ticker tkr))))))
(-> (query-quandl! dataset ticker query-params)
(assoc :dataset dataset :ticker ticker)))