Skip to content

Commit

Permalink
Some small changes to enable faster startup.
Browse files Browse the repository at this point in the history
  • Loading branch information
cnuernber committed Feb 3, 2024
1 parent 35efa1e commit 412b08a
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 58 deletions.
1 change: 0 additions & 1 deletion src/tech/v3/dataset/impl/column.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
[tech.v3.datatype.bitmap :refer [->bitmap] :as bitmap]
[tech.v3.datatype.packing :as packing]
[tech.v3.datatype.argops :as argops]
[tech.v3.tensor :as dtt]
[tech.v3.dataset.impl.column-base :as column-base]
[tech.v3.dataset.impl.column-data-process :as column-data-process]
[ham-fisted.lazy-noncaching :as lznc]
Expand Down
7 changes: 4 additions & 3 deletions src/tech/v3/dataset/io/column_parsers.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
[tech.v3.datatype.bitmap :as bitmap]
[tech.v3.datatype.errors :as errors]
[tech.v3.datatype.argops :as argops]
[tech.v3.datatype.datetime :as dtype-dt]
[tech.v3.datatype.protocols :as dtype-proto]
[ham-fisted.api :as hamf])
(:import [java.util UUID List]
Expand Down Expand Up @@ -254,6 +253,8 @@
unpacked-datatype formatter)]
[(make-safe-parser datetime (parser-fn v)) false]))

(def ^:private datetime-datatype? (requiring-resolve 'tech.v3.datatype.datetime/datetime-datatype?))


(defn parser-entry->parser-tuple
[parser-kwd]
Expand All @@ -268,11 +269,11 @@
[(find-fixed-parser parser-datatype) true]
(instance? IFn parser-fn)
[parser-fn true]
(and (dtype-dt/datetime-datatype? parser-datatype)
(and (datetime-datatype? parser-datatype)
(string? parser-fn))
(datetime-formatter-parser-fn parser-datatype
(DateTimeFormatter/ofPattern parser-fn))
(and (dtype-dt/datetime-datatype? parser-datatype)
(and (datetime-datatype? parser-datatype)
(instance? DateTimeFormatter parser-fn))
(datetime-formatter-parser-fn parser-datatype parser-fn)
(= :text parser-datatype)
Expand Down
52 changes: 2 additions & 50 deletions src/tech/v3/dataset/io/csv.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(:require [charred.api :as charred]
[charred.bulk :as bulk]
[charred.coerce :as coerce]
[tech.v3.dataset.io.csv-min :as csv-min]
[tech.v3.dataset.io :as ds-io]
[tech.v3.parallel.for :as pfor]
[tech.v3.datatype :as dtype]
Expand All @@ -21,63 +22,14 @@
[java.util Iterator]))


(set! *warn-on-reflection* true)
(set! *unchecked-math* :warn-on-boxed)


(defn rows->dataset-fn
"Create an efficiently callable function to parse row-batches into datasets.
Returns function from row-iter->dataset. Options passed in here are the
same as ->dataset."
[{:keys [header-row?]
:or {header-row? true}
:as options}]
(fn [row-iter]
(let [row-iter (coerce/->iterator row-iter)
header-row (if (and header-row? (.hasNext row-iter))
(vec (.next row-iter))
[])
n-header-cols (count header-row)
{:keys [parsers col-idx->parser]}
(parse-context/options->col-idx-parse-context
options :string (fn [^long col-idx]
(when (< col-idx n-header-cols)
(header-row col-idx))))
n-records (get options :n-records (get options :num-rows))]
;;initialize parsers so if there are no more rows we get a dataset with
;;at least column names
(dotimes [idx n-header-cols]
(col-idx->parser idx))

(if n-records
(let [n-records (long n-records)]
(loop [continue? (.hasNext row-iter)
row-idx 0]
(if continue?
(do
(reduce (hamf-rf/indexed-accum
acc col-idx field
(-> (col-idx->parser col-idx)
(column-parsers/add-value! row-idx field)))
nil
(.next row-iter))
(recur (and (.hasNext row-iter)
(< (inc row-idx) n-records))
(unchecked-inc row-idx)))
(parse-context/parsers->dataset options parsers))))
(loop [continue? (.hasNext row-iter)
row-idx 0]
(if continue?
(do
(reduce (hamf-rf/indexed-accum
acc col-idx field
(-> (col-idx->parser col-idx)
(column-parsers/add-value! row-idx field)))
nil
(.next row-iter))
(recur (.hasNext row-iter) (unchecked-inc row-idx)))
(parse-context/parsers->dataset options parsers)))))))

(csv-min/rows->dataset-fn options))


(defn csv->dataset-seq
Expand Down
82 changes: 82 additions & 0 deletions src/tech/v3/dataset/io/csv_min.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
(ns tech.v3.dataset.io.csv-min
(:require [tech.v3.dataset.io.context :as parse-context]
[tech.v3.dataset.io.column-parsers :as column-parsers]
[charred.coerce :as coerce]
[charred.api :as charred]
[ham-fisted.reduce :as hamf-rf]))


(set! *warn-on-reflection* true)
(set! *unchecked-math* :warn-on-boxed)


(defn rows->dataset-fn
"Create an efficiently callable function to parse row-batches into datasets.
Returns function from row-iter->dataset. Options passed in here are the
same as ->dataset."
[{:keys [header-row?]
:or {header-row? true}
:as options}]
(fn [row-iter]
(let [row-iter (coerce/->iterator row-iter)
header-row (if (and header-row? (.hasNext row-iter))
(vec (.next row-iter))
[])
n-header-cols (count header-row)
{:keys [parsers col-idx->parser]}
(parse-context/options->col-idx-parse-context
options :string (fn [^long col-idx]
(when (< col-idx n-header-cols)
(header-row col-idx))))
n-records (get options :n-records (get options :num-rows))]
;;initialize parsers so if there are no more rows we get a dataset with
;;at least column names
(dotimes [idx n-header-cols]
(col-idx->parser idx))

(if n-records
(let [n-records (long n-records)]
(loop [continue? (.hasNext row-iter)
row-idx 0]
(if continue?
(do
(reduce (hamf-rf/indexed-accum
acc col-idx field
(-> (col-idx->parser col-idx)
(column-parsers/add-value! row-idx field)))
nil
(.next row-iter))
(recur (and (.hasNext row-iter)
(< (inc row-idx) n-records))
(unchecked-inc row-idx)))
(parse-context/parsers->dataset options parsers))))
(loop [continue? (.hasNext row-iter)
row-idx 0]
(if continue?
(do
(reduce (hamf-rf/indexed-accum
acc col-idx field
(-> (col-idx->parser col-idx)
(column-parsers/add-value! row-idx field)))
nil
(.next row-iter))
(recur (.hasNext row-iter) (unchecked-inc row-idx)))
(parse-context/parsers->dataset options parsers)))))))


(defn csv->dataset
"Read a csv into a dataset. Same options as [[tech.v3.dataset/->dataset]]."
[input-path & [options]]
(let [s (charred/read-csv-supplier (java.io.File. (str input-path))
(merge {:profile :mutable} options))]
(try
(let [iter (if-let [n-initial-skip-rows (get options :n-initial-skip-rows)]
(let [iter (coerce/->iterator s)]
(dotimes [idx n-initial-skip-rows]
(when (.hasNext iter) (.next iter)))
iter)
s)]
((rows->dataset-fn options) iter))
(finally
(when (instance? java.lang.AutoCloseable s)
(.close ^java.lang.AutoCloseable s))))))
2 changes: 1 addition & 1 deletion src/tech/v3/dataset/io/datetime.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns tech.v3.dataset.io.datetime
"Helpful and well tested string->datetime pathways."
(:require [clojure.string :as s]
[tech.v3.datatype.datetime :as dtype-dt])
[tech.v3.datatype.datetime.constants :as dtype-dt])
(:import [java.time LocalDate LocalDateTime LocalTime
ZonedDateTime OffsetDateTime Instant Duration]
[java.time.format DateTimeFormatter DateTimeFormatterBuilder]))
Expand Down
5 changes: 2 additions & 3 deletions src/tech/v3/dataset/string_table.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns tech.v3.dataset.string-table
(:require [tech.v3.datatype :as dtype]
[tech.v3.datatype.protocols :as dtype-proto]
(:require [tech.v3.datatype.protocols :as dtype-proto]
[tech.v3.datatype.base :as dtype-base]
[tech.v3.dataset.dynamic-int-list :as int-list]
[tech.v3.parallel.for :as parallel-for]
Expand Down Expand Up @@ -32,7 +31,7 @@
(clone [this]
;;We do not need to dedup any more; a java array is a more efficient
;;storage mechanism
(dtype/make-container :jvm-heap :string this))
(dtype-proto/make-container :jvm-heap :string nil this))
PStrTable
(get-str-table [_this] {:int->str int->str
:str->int str->int})
Expand Down

0 comments on commit 412b08a

Please sign in to comment.