From 412b08a1927526449f784aad55007b0cd29656b4 Mon Sep 17 00:00:00 2001 From: Chris Nuernberger Date: Sat, 3 Feb 2024 11:22:22 -0700 Subject: [PATCH] Some small changes to enable faster startup. --- src/tech/v3/dataset/impl/column.clj | 1 - src/tech/v3/dataset/io/column_parsers.clj | 7 +- src/tech/v3/dataset/io/csv.clj | 52 +------------- src/tech/v3/dataset/io/csv_min.clj | 82 +++++++++++++++++++++++ src/tech/v3/dataset/io/datetime.clj | 2 +- src/tech/v3/dataset/string_table.clj | 5 +- 6 files changed, 91 insertions(+), 58 deletions(-) create mode 100644 src/tech/v3/dataset/io/csv_min.clj diff --git a/src/tech/v3/dataset/impl/column.clj b/src/tech/v3/dataset/impl/column.clj index 2df090543..c5a4ba9a7 100644 --- a/src/tech/v3/dataset/impl/column.clj +++ b/src/tech/v3/dataset/impl/column.clj @@ -10,7 +10,6 @@ [tech.v3.datatype.bitmap :refer [->bitmap] :as bitmap] [tech.v3.datatype.packing :as packing] [tech.v3.datatype.argops :as argops] - [tech.v3.tensor :as dtt] [tech.v3.dataset.impl.column-base :as column-base] [tech.v3.dataset.impl.column-data-process :as column-data-process] [ham-fisted.lazy-noncaching :as lznc] diff --git a/src/tech/v3/dataset/io/column_parsers.clj b/src/tech/v3/dataset/io/column_parsers.clj index 004b8783b..9a50a89cb 100644 --- a/src/tech/v3/dataset/io/column_parsers.clj +++ b/src/tech/v3/dataset/io/column_parsers.clj @@ -8,7 +8,6 @@ [tech.v3.datatype.bitmap :as bitmap] [tech.v3.datatype.errors :as errors] [tech.v3.datatype.argops :as argops] - [tech.v3.datatype.datetime :as dtype-dt] [tech.v3.datatype.protocols :as dtype-proto] [ham-fisted.api :as hamf]) (:import [java.util UUID List] @@ -254,6 +253,8 @@ unpacked-datatype formatter)] [(make-safe-parser datetime (parser-fn v)) false])) +(def ^:private datetime-datatype? (requiring-resolve 'tech.v3.datatype.datetime/datetime-datatype?)) + (defn parser-entry->parser-tuple [parser-kwd] @@ -268,11 +269,11 @@ [(find-fixed-parser parser-datatype) true] (instance? IFn parser-fn) [parser-fn true] - (and (dtype-dt/datetime-datatype? parser-datatype) + (and (datetime-datatype? parser-datatype) (string? parser-fn)) (datetime-formatter-parser-fn parser-datatype (DateTimeFormatter/ofPattern parser-fn)) - (and (dtype-dt/datetime-datatype? parser-datatype) + (and (datetime-datatype? parser-datatype) (instance? DateTimeFormatter parser-fn)) (datetime-formatter-parser-fn parser-datatype parser-fn) (= :text parser-datatype) diff --git a/src/tech/v3/dataset/io/csv.clj b/src/tech/v3/dataset/io/csv.clj index 26a4a51ce..f9e063d26 100644 --- a/src/tech/v3/dataset/io/csv.clj +++ b/src/tech/v3/dataset/io/csv.clj @@ -3,6 +3,7 @@ (:require [charred.api :as charred] [charred.bulk :as bulk] [charred.coerce :as coerce] + [tech.v3.dataset.io.csv-min :as csv-min] [tech.v3.dataset.io :as ds-io] [tech.v3.parallel.for :as pfor] [tech.v3.datatype :as dtype] @@ -21,10 +22,6 @@ [java.util Iterator])) -(set! *warn-on-reflection* true) -(set! *unchecked-math* :warn-on-boxed) - - (defn rows->dataset-fn "Create an efficiently callable function to parse row-batches into datasets. Returns function from row-iter->dataset. Options passed in here are the @@ -32,52 +29,7 @@ [{:keys [header-row?] :or {header-row? true} :as options}] - (fn [row-iter] - (let [row-iter (coerce/->iterator row-iter) - header-row (if (and header-row? (.hasNext row-iter)) - (vec (.next row-iter)) - []) - n-header-cols (count header-row) - {:keys [parsers col-idx->parser]} - (parse-context/options->col-idx-parse-context - options :string (fn [^long col-idx] - (when (< col-idx n-header-cols) - (header-row col-idx)))) - n-records (get options :n-records (get options :num-rows))] - ;;initialize parsers so if there are no more rows we get a dataset with - ;;at least column names - (dotimes [idx n-header-cols] - (col-idx->parser idx)) - - (if n-records - (let [n-records (long n-records)] - (loop [continue? (.hasNext row-iter) - row-idx 0] - (if continue? - (do - (reduce (hamf-rf/indexed-accum - acc col-idx field - (-> (col-idx->parser col-idx) - (column-parsers/add-value! row-idx field))) - nil - (.next row-iter)) - (recur (and (.hasNext row-iter) - (< (inc row-idx) n-records)) - (unchecked-inc row-idx))) - (parse-context/parsers->dataset options parsers)))) - (loop [continue? (.hasNext row-iter) - row-idx 0] - (if continue? - (do - (reduce (hamf-rf/indexed-accum - acc col-idx field - (-> (col-idx->parser col-idx) - (column-parsers/add-value! row-idx field))) - nil - (.next row-iter)) - (recur (.hasNext row-iter) (unchecked-inc row-idx))) - (parse-context/parsers->dataset options parsers))))))) - + (csv-min/rows->dataset-fn options)) (defn csv->dataset-seq diff --git a/src/tech/v3/dataset/io/csv_min.clj b/src/tech/v3/dataset/io/csv_min.clj new file mode 100644 index 000000000..4a1b01ab7 --- /dev/null +++ b/src/tech/v3/dataset/io/csv_min.clj @@ -0,0 +1,82 @@ +(ns tech.v3.dataset.io.csv-min + (:require [tech.v3.dataset.io.context :as parse-context] + [tech.v3.dataset.io.column-parsers :as column-parsers] + [charred.coerce :as coerce] + [charred.api :as charred] + [ham-fisted.reduce :as hamf-rf])) + + +(set! *warn-on-reflection* true) +(set! *unchecked-math* :warn-on-boxed) + + +(defn rows->dataset-fn + "Create an efficiently callable function to parse row-batches into datasets. + Returns function from row-iter->dataset. Options passed in here are the + same as ->dataset." + [{:keys [header-row?] + :or {header-row? true} + :as options}] + (fn [row-iter] + (let [row-iter (coerce/->iterator row-iter) + header-row (if (and header-row? (.hasNext row-iter)) + (vec (.next row-iter)) + []) + n-header-cols (count header-row) + {:keys [parsers col-idx->parser]} + (parse-context/options->col-idx-parse-context + options :string (fn [^long col-idx] + (when (< col-idx n-header-cols) + (header-row col-idx)))) + n-records (get options :n-records (get options :num-rows))] + ;;initialize parsers so if there are no more rows we get a dataset with + ;;at least column names + (dotimes [idx n-header-cols] + (col-idx->parser idx)) + + (if n-records + (let [n-records (long n-records)] + (loop [continue? (.hasNext row-iter) + row-idx 0] + (if continue? + (do + (reduce (hamf-rf/indexed-accum + acc col-idx field + (-> (col-idx->parser col-idx) + (column-parsers/add-value! row-idx field))) + nil + (.next row-iter)) + (recur (and (.hasNext row-iter) + (< (inc row-idx) n-records)) + (unchecked-inc row-idx))) + (parse-context/parsers->dataset options parsers)))) + (loop [continue? (.hasNext row-iter) + row-idx 0] + (if continue? + (do + (reduce (hamf-rf/indexed-accum + acc col-idx field + (-> (col-idx->parser col-idx) + (column-parsers/add-value! row-idx field))) + nil + (.next row-iter)) + (recur (.hasNext row-iter) (unchecked-inc row-idx))) + (parse-context/parsers->dataset options parsers))))))) + + +(defn csv->dataset + "Read a csv into a dataset. Same options as [[tech.v3.dataset/->dataset]]." + [input-path & [options]] + (let [s (charred/read-csv-supplier (java.io.File. (str input-path)) + (merge {:profile :mutable} options))] + (try + (let [iter (if-let [n-initial-skip-rows (get options :n-initial-skip-rows)] + (let [iter (coerce/->iterator s)] + (dotimes [idx n-initial-skip-rows] + (when (.hasNext iter) (.next iter))) + iter) + s)] + ((rows->dataset-fn options) iter)) + (finally + (when (instance? java.lang.AutoCloseable s) + (.close ^java.lang.AutoCloseable s)))))) diff --git a/src/tech/v3/dataset/io/datetime.clj b/src/tech/v3/dataset/io/datetime.clj index c6c3d3760..5ac196a10 100644 --- a/src/tech/v3/dataset/io/datetime.clj +++ b/src/tech/v3/dataset/io/datetime.clj @@ -1,7 +1,7 @@ (ns tech.v3.dataset.io.datetime "Helpful and well tested string->datetime pathways." (:require [clojure.string :as s] - [tech.v3.datatype.datetime :as dtype-dt]) + [tech.v3.datatype.datetime.constants :as dtype-dt]) (:import [java.time LocalDate LocalDateTime LocalTime ZonedDateTime OffsetDateTime Instant Duration] [java.time.format DateTimeFormatter DateTimeFormatterBuilder])) diff --git a/src/tech/v3/dataset/string_table.clj b/src/tech/v3/dataset/string_table.clj index db5e778ac..0b8e72cdf 100644 --- a/src/tech/v3/dataset/string_table.clj +++ b/src/tech/v3/dataset/string_table.clj @@ -1,6 +1,5 @@ (ns tech.v3.dataset.string-table - (:require [tech.v3.datatype :as dtype] - [tech.v3.datatype.protocols :as dtype-proto] + (:require [tech.v3.datatype.protocols :as dtype-proto] [tech.v3.datatype.base :as dtype-base] [tech.v3.dataset.dynamic-int-list :as int-list] [tech.v3.parallel.for :as parallel-for] @@ -32,7 +31,7 @@ (clone [this] ;;We do not need to dedup any more; a java array is a more efficient ;;storage mechanism - (dtype/make-container :jvm-heap :string this)) + (dtype-proto/make-container :jvm-heap :string nil this)) PStrTable (get-str-table [_this] {:int->str int->str :str->int str->int})