diff --git a/dev-resources/notes.clj b/dev-resources/notes.clj new file mode 100644 index 0000000..876f29c --- /dev/null +++ b/dev-resources/notes.clj @@ -0,0 +1,10 @@ +#!/usr/bin/env clj + +@mbp2:dev-resources $ clj +Clojure 1.10.0 +user=> (def mstar [{:dataset "mstar" :ticker "abc"} {:dataset "mstar" :ticker "ghf"}]) +#'user/mstar +user=> (def tingo [{:dataset "tingo" :ticker "xyz"} {:dataset "tingo" :ticker "abc"}]) +#'user/tingo +user=> (map vector mstar tingo) +([{:dataset "mstar", :ticker "abc"} {:dataset "tingo", :ticker "xyz"}] [{:dataset "mstar", :ticker "ghf"} {:dataset "tingo", :ticker "abc"}]) diff --git a/dev-resources/splitter.clj b/dev-resources/splitter.clj new file mode 100755 index 0000000..983fdca --- /dev/null +++ b/dev-resources/splitter.clj @@ -0,0 +1,184 @@ +#!/usr/bin/env clj + +(require '[clojure.pprint :as pprint]) + +(defn print-it [coll] + (pprint/pprint coll) + coll) + +(def data [{:dataset "ALPHA-VANTAGE", + :ticker + ["BRK-B" + "LON:FCH" + "SFTBF" + "TSM" + "NTDOY" + "NTDOF" + "VEMAX" + "VEURX" + "VEXPX" + "VGWAX" + "VITAX" + "VIMAX" + "VMRAX" + "VPACX" + "VGSLX" + "VTIAX" + "VTSAX" + "VWINX" + "VWENX" + "VWNDX" + "VMMXX" + "VWIGX" + "VINEX" + "VMMSX" + "VWUAX" + "VEXPX" + "VIMSX" + "VMRGX" + "VGTSX" + "VFH" + "VEA" + "VWO" + "VHT" + "VGT" + "IAU" + "GLD" + "FB" + "AMZN" + "GOOG" + "NVDA" + "CY" + "INTC" + "TXN" + "V" + "SAP" + "SQ" + "PYPL" + "BRK.B" + "AAPL" + "TWTR" + "GS" + "INTU"]} + {:dataset "TIINGO", + :ticker + ["BRK-B" + "FB" + "AMZN" + "GOOG" + "NVDA" + "CY" + "INTC" + "TXN" + "V" + "SAP" + "SQ" + "PYPL" + "AAPL" + "TWTR" + "GS" + "INTU" + "LON:FCH" + "SFTBF" + "TSM" + "NTDOY" + "NTDOF" + "VFH" + "VEA" + "VWO" + "VHT" + "VGT" + "IAU" + "GLD" + "VEMAX" + "VEURX" + "VEXPX" + "VGWAX" + "VITAX" + "VIMAX" + "VMRAX" + "VPACX" + "VGSLX" + "VTIAX" + "VTSAX" + "VWINX" + "VWENX" + "VWNDX" + "VMMXX" + "VWIGX" + "VINEX" + "VMMSX" + "VWUAX" + "VEXPX" + "VIMSX" + "VMRGX" + "VGTSX"]} + {:dataset "MSTAR", + :ticker + ["FB" + "AMZN" + "GOOG" + "NVDA" + "CY" + "INTC" + "TXN" + "V" + "SAP" + "SQ" + "PYPL" + "BRK.B" + "AAPL" + "TWTR" + "GS" + "INTU" + "LON:FCH" + "SFTBF" + "TSM" + "NTDOY" + "NTDOF" + "VFH" + "VEA" + "VWO" + "VHT" + "VGT" + "IAU" + "GLD" + "VEMAX" + "VEURX" + "VEXPX" + "VGWAX" + "VITAX" + "VIMAX" + "VMRAX" + "VPACX" + "VGSLX" + "VTIAX" + "VTSAX" + "VWINX" + "VWENX" + "VWNDX" + "VMMXX" + "VWIGX" + "VINEX" + "VMMSX" + "VWUAX" + "VEXPX" + "VIMSX" + "VMRGX" + "VGTSX"]}]) + +(defn split-it [m] + (let [d (:dataset m)] + (->> m + :ticker + (mapv #(assoc {} :dataset d :ticker %)) + #_print-it))) + +(->> data + (mapcat split-it) + ;vec + ;(map vector) + ;(partition-all 2 ) + ;flatten + doall + print-it) diff --git a/project.clj b/project.clj index ecafd58..037ff8c 100644 --- a/project.clj +++ b/project.clj @@ -3,6 +3,7 @@ :dependencies [[org.clojure/clojure "1.10.0"] [clj-http "3.9.1"] [clj-time "0.15.1"] + [com.climate/claypoole "1.1.4"] [environ "1.1.0"] [org.clojure/data.json "0.2.6"] [org.clojure/java.jdbc "0.7.9"] diff --git a/src/jobs/equities.clj b/src/jobs/equities.clj index c93b530..eacc153 100644 --- a/src/jobs/equities.clj +++ b/src/jobs/equities.clj @@ -2,6 +2,8 @@ (:require [clj-time.coerce :as coerce] [clj-time.core :as time] [clj-time.format :as format] + [clojure.core.reducers :as r] + [com.climate.claypoole :as cp] [clojure.data.json :as json] [clojure.java.jdbc :as jdbc] [clojure.tools.cli :as cli] @@ -310,9 +312,32 @@ :date util/joda-date->date-str)} query-params) - data (->> (concat alpha-vantage tiingo morningstar quandl) - (map #(api/get-data % query-params*)) - flatten)] - (execute! cxn data))) + r-data (fn [] + (->> (concat #_alpha-vantage tiingo morningstar quandl) + vec + (r/map (fn [m] (let [d (:dataset m)] + (->> m + :ticker + (map #(assoc {} :dataset d :ticker %)))))) + r/flatten + (r/map #(api/get-data % query-params*)) + r/flatten + r/foldcat)) + data (cp/with-shutdown! [pool (cp/threadpool 2)] + (->> (concat #_alpha-vantage tiingo morningstar #_quandl) + vec + (cp/pmap pool (fn [m] (let [d (:dataset m)] + (->> m + :ticker + (map #(assoc {} :dataset d :ticker %)))))) + flatten + (cp/pmap pool #(api/get-data % query-params*)) + (map #((> 2 (count %)))) + util/print-it + flatten + doall + ))] + (util/print-it data) + #_(execute! cxn data))) - (util/notify-healthchecks-io (-> :healthchecks-io-api-key env))) + #_(util/notify-healthchecks-io (-> :healthchecks-io-api-key env))) diff --git a/src/markets_etl/api.clj b/src/markets_etl/api.clj index bd7e130..39ac185 100644 --- a/src/markets_etl/api.clj +++ b/src/markets_etl/api.clj @@ -57,6 +57,7 @@ (query-alpha-vantage-api! ticker {})) ([url ticker paramz] {:pre [(every? true? (allowed? paramz))]} + (log/info "query-alpha-vantage! called") (Thread/sleep 5500) (let [params (dissoc paramz :limit) response (try @@ -107,6 +108,7 @@ (query-tiingo! ticker {})) ([ticker paramz] {:pre [(every? true? (allowed? paramz))]} + (log/info "query-tiingo! called") (let [params (dissoc paramz :limit) url (str (:protocol tiingo-api) (:url tiingo-api) @@ -133,7 +135,8 @@ (if (= 200 status) (-> body (json/read-str :key-fn (comp keyword string/lower-case))) - (log/error "Tiingo request, status:" status "Ticker:" ticker))))) + (do (log/error "Tiingo request, status:" status "Ticker:" ticker) + nil))))) (defn query-intrinio! ;; turning this off for now - as of Dec 2018 need to ([ticker] ;; pay for any data now @@ -163,13 +166,15 @@ (if (= 200 status) (-> body (json/read-str :key-fn keyword)) - (log/error "Intrinio request, exception:" status "Ticker:" ticker))))) + (do (log/error "Intrinio request, exception:" status "Ticker:" ticker) + nil))))) (defn query-morningstar! ([ticker] (query-morningstar! ticker {})) ([ticker paramz] {:pre [(every? true? (allowed? paramz))]} + (log/info "query-morningstar! called") (let [params (dissoc paramz :limit) url (str (:protocol morningstar-api) (:url morningstar-api) @@ -196,14 +201,16 @@ (if (and (= 200 status) ((comp not empty?) body')) (-> body' (json/read-str :key-fn (comp keyword string/lower-case))) - (log/error "Morningstar-api request, exception:" status - "Ticker:" ticker))))) + (do (log/error "Morningstar-api request, exception:" status + "Ticker:" ticker) + nil))))) (defn query-quandl! ([dataset ticker] (query-quandl! dataset ticker {})) ([dataset ticker paramz] {:pre [(every? true? (allowed? paramz))]} + (log/info "query-quandl! called") (let [url (str (:protocol quandl-api) (:url quandl-api) (str dataset "/") @@ -235,28 +242,22 @@ (defmethod get-data "TIINGO" [{:keys [dataset ticker]} query-params] - (->> ticker - (map (fn [tkr] - (->> (query-tiingo! tkr - query-params) - (map #(assoc % :dataset dataset :ticker tkr))))))) + (->> (query-tiingo! ticker + query-params) + (map #(assoc % :dataset dataset :ticker ticker)))) (defmethod get-data "MSTAR" [{:keys [dataset ticker]} query-params] - (->> ticker - (map (fn [tkr] - (-> (query-morningstar! tkr - query-params) - (assoc :dataset dataset :ticker tkr)))))) + (-> (query-morningstar! ticker + query-params) + (assoc :dataset dataset :ticker ticker))) (defmethod get-data "INTRINIO" [{:keys [dataset ticker]} query-params] - (->> ticker - (map (fn [tkr] - (-> (query-intrinio! tkr query-params) - (assoc :dataset dataset :ticker tkr)))))) + (-> (query-intrinio! ticker query-params) + (assoc :dataset dataset :ticker ticker))) (defmethod get-data "ALPHA-VANTAGE" [{:keys [dataset ticker]} @@ -266,17 +267,13 @@ alpha-vantage-dataset (if (clojure.set/subset? ticker' currencies) :currency :equities)] - (->> ticker - (map (fn [tkr] - (-> (query-alpha-vantage! {:endpoint alpha-vantage-dataset - :ticker tkr - :query-params query-params}) - (assoc :dataset dataset :ticker tkr))))))) + (-> (query-alpha-vantage! {:endpoint alpha-vantage-dataset + :ticker ticker + :query-params query-params}) + (assoc :dataset dataset :ticker ticker)))) (defmethod get-data :default [{:keys [dataset - ticker] :as m} + ticker]} query-params] - (->> ticker - (map (fn [tkr] - (-> (query-quandl! dataset tkr query-params) - (assoc :dataset dataset :ticker tkr)))))) + (-> (query-quandl! dataset ticker query-params) + (assoc :dataset dataset :ticker ticker)))