diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7cd94ef --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +/target +/classes +/checkouts +profiles.clj +pom.xml +pom.xml.asc +*.jar +*.class +/.lein-* +/.nrepl-port +.hgignore +.hg/ +.cpcache/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d3087e4 --- /dev/null +++ b/LICENSE @@ -0,0 +1,277 @@ +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: {name license(s), +version(s), and exceptions or additional permissions here}." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/README.org b/README.org new file mode 100644 index 0000000..276cc15 --- /dev/null +++ b/README.org @@ -0,0 +1,76 @@ +#+TITLE: clj-pgcopy + +Import data into postgres quickly, implemented using postgresql's +=COPY= in binary format. + +* Input Type mapping + +** Basic type mapping + +| JVM type | Postgres type | +|----------------+-------------------------------| +| Short | int2 (aka smallint) | +| Integer | int4 (aka integer) | +| Long | int8 (aka bigint) | +| Float | float4 (aka real) | +| Double | float8 (aka double presicion) | +| BigDecimal | numeric/decimal | +| Boolean | boolean | +| String | text/varchar/char | +| java.util.UUID | uuid | + +** Date-related mappings + +| JVM type | Postgres type | +|------------------------------+---------------| +| java.sql.Date | date | +| java.time.LocalDate | date | +| java.util.Date | timestamp[tz] | +| java.sql.Timestamp | timestamp[tz] | +| java.time.Instant | timestamp[tz] | +| java.time.ZonedDateTime | timestamp[tz] | +| java.time.OffsetDatetime | timestamp[tz] | +| org.postgres.util.PGInterval | interval | + +** Geometric mappings + +| JVM type | Postgres type | +|----------------------------------+---------------| +| org.postgres.geometric.PGpoint | point | +| org.postgres.geometric.PGline | line | +| org.postgres.geometric.PGpath | path | +| org.postgres.geometric.PGbox | box | +| org.postgres.geometric.PGcircle | circle | +| org.postgres.geometric.PGpolygon | polygon | + +Things that are String-like, or serialized in string form, should work +using the String -> text mapping. An exception is the =jsonb= type, +because the binary format requires a version signifier. Wrapping a +JSON string in a =JsonB= handles that, which is provided by the +library. + +** Arrays + +Impemented for the following JVM-typed arrays for: + +| JVM type | Postgres type | +|------------------+-----------------------------------| +| int[] | int4[] (aka integer[]) | +| long[] | int8[] (aka bigint[]) | +| float[] | float4[] (aka real[]) | +| double[] | float8[] (aka double precision[]) | +| byte[] | bytea | +| String[] | text[] (or varchar) | +| java.util.UUID[] | uuid[] | + + +Currently, only 1-dimensional Postgres arrays are supported. + +** TODO + +- hstore (wrapper?) +- inet, cidr, macaddr, macaddr8 +- bit strings +- composite types +- range types +- more array types? (date, timestamp, etc) diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..16415cd --- /dev/null +++ b/deps.edn @@ -0,0 +1,12 @@ +{:paths ["src"] + :deps {org.postgresql/postgresql {:mvn/version "42.2.6"}} + :aliases {:run-tests + {:extra-paths ["test"] + :extra-deps {lambdaisland/kaocha {:mvn/version "0.0-529"} + org.clojure/java.jdbc {:mvn/version "0.7.9"}} + :main-opts ["-m" "kaocha.runner"]} + :dev + {:extra-paths ["dev" "test"] + :extra-deps {org.clojure/tools.namespace {:mvn/version "0.3.0"} + org.clojure/java.jdbc {:mvn/version "0.7.9"}} + :jvm-opts ["-XX:-OmitStackTraceInFastThrow"]}}} diff --git a/dev/user.clj b/dev/user.clj new file mode 100644 index 0000000..4d153a7 --- /dev/null +++ b/dev/user.clj @@ -0,0 +1,19 @@ +(ns ^{:clojure.tools.namespace.repl/load false + :clojure.tools.namespace.repl/unload false} user + (:require + ;; Defaults copied from clojure.main + [clojure.repl :refer (source apropos dir pst doc find-doc)] + [clojure.java.javadoc :refer (javadoc)] + [clojure.pprint :refer (pp pprint)] + ;; very common convenience ns aliases + [clojure.java.io :as io] + [clojure.set :as set] + [clojure.edn :as edn] + [clojure.string :as str] + [clojure.test :as test] + [clojure.tools.namespace.repl :as ctn])) + +(ctn/set-refresh-dirs "src" "test") + +(defn reset [] + (ctn/refresh)) diff --git a/src/clj_pgcopy/core.clj b/src/clj_pgcopy/core.clj new file mode 100644 index 0000000..b169cae --- /dev/null +++ b/src/clj_pgcopy/core.clj @@ -0,0 +1,423 @@ +(ns clj-pgcopy.core + (:require [clojure.string :as str] + [clj-pgcopy.time :as ptime]) + (:import (java.io ByteArrayInputStream + ByteArrayOutputStream + BufferedReader + BufferedOutputStream + DataOutputStream) + (java.nio ByteBuffer) + (java.time LocalDateTime + LocalDate + ZonedDateTime + OffsetDateTime + Instant) + (java.util UUID) + (org.postgresql.geometric PGbox + PGcircle + PGline + PGpath + PGpolygon + PGpoint) + (org.postgresql.util PGInterval) + (org.postgresql.copy CopyManager + PGCopyOutputStream) + (org.postgresql.core BaseConnection))) + +(set! *warn-on-reflection* true) + +(defprotocol IPGBinaryWrite + (pg-type [this]) + (write-to [this ^DataOutputStream out])) + +(def oids + {:bytea 17 + :text 25 + :int4 23 + :int8 20 + :int2 21 + :char 18 + :boolean 16 + :jsonb 114 + :xml 115 + :point 600 + :line 628 + :path 602 + :box 603 + :polygon 604 + :circle 705 + :float4 700 + :float8 701 + :unknown 705 + :varchar 1043 + :date 1082 + :timestamp 1114 + :timestamptz 1184 + :interval 1186 + :numeric 1700 + :uuid 2950}) + +(defn array->bytes + "Returns binary-encoded byte array when type of array can be determined, or nil" + [pg-type coll] + (let [baos ^ByteArrayOutputStream (ByteArrayOutputStream. 1024) + [_ oid] (find oids pg-type) + coll (seq coll)] + (when oid + (with-open [out ^DataOutputStream (DataOutputStream. baos)] + (.writeInt out 1) ;; dimensions (only 1 dimensional) + (.writeInt out 1) ;; nullable values allowed + (.writeInt out oid) ;; oid of collection elements + (.writeInt out (count coll)) ;; size + (.writeInt out 1) ;; use PG default + (doseq [el coll] + (write-to el out))) + (.toByteArray baos)))) + +;; Inspired by Java impl here: +;; https://github.com/bytefish/PgBulkInsert/blob/master/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/pgsql/handlers/BigDecimalValueHandler.java +(defn numeric-components [^BigDecimal bd] + (let [unscaled ^BigInteger (.unscaledValue bd) + sign (.signum bd) + unscaled (if (= -1 sign) (.negate unscaled) unscaled) + fraction-digits ^int (.scale bd) + fraction-groups (unchecked-divide-int (unchecked-add-int fraction-digits (int 3)) + 4) + scale-remainder (mod fraction-digits 4) + [unscaled digits] (if (zero? scale-remainder) + [unscaled (list)] + ;; scale the first value + (let [result (.divideAndRemainder unscaled (.pow (BigInteger. "10") scale-remainder)) + digit (unchecked-multiply-int ^int (.intValue ^BigInteger (aget result 1)) + (int (Math/pow 10 (- 4 scale-remainder))))] + [(aget result 0) (list digit)])) + digits (loop [^BigInteger unscaled unscaled + digits digits] + (if (.equals unscaled BigInteger/ZERO) + digits + (let [result (.divideAndRemainder unscaled (BigInteger. "10000"))] + (recur + (aget result 0) + (cons (.intValue ^BigInteger (aget result 1)) digits)))))] + {:sign sign + :fraction-groups fraction-groups + :fraction-digits fraction-digits + :digits digits})) + +(extend-protocol IPGBinaryWrite + (Class/forName "[B") + (pg-type [_] :bytea) + (write-to [ba ^DataOutputStream out] + (.writeInt out (count ba)) + (doseq [b ba] + (.writeByte out b))) + + String + (pg-type [_] :text) + (write-to [string ^DataOutputStream out] + (write-to ^bytes (.getBytes string "UTF-8") out)) + + Short + (pg-type [_] :int2) + (write-to [sh ^DataOutputStream out] + (.writeInt out 2) + (.writeShort out sh)) + + Integer + (pg-type [_] :int4) + (write-to [integer ^DataOutputStream out] + (.writeInt out 4) + (.writeInt out integer)) + + Long + (pg-type [_] :int8) + (write-to [num ^DataOutputStream out] + (.writeInt out 8) + (.writeLong out num)) + + Float + (pg-type [_] :float4) + (write-to [f ^DataOutputStream out] + (.writeInt out 4) + (.writeFloat out (.floatValue f))) + + Double + (pg-type [_] :float8) + (write-to [d ^DataOutputStream out] + (.writeInt out 8) + (.writeDouble out (.doubleValue d))) + + BigDecimal + (pg-type [_] :numeric) + (write-to [bd ^DataOutputStream out] + (let [{:keys [fraction-digits fraction-groups sign digits]} + (numeric-components bd) + n-digits (int (count digits))] + (.writeInt out (int (+ 8 (* 2 n-digits)))) + (.writeShort out n-digits) + (.writeShort out (- n-digits fraction-groups 1)) + (.writeShort out (int (if (= sign 1) 0x0000 0x4000))) + (.writeShort out fraction-digits) + (doseq [digit digits] + (.writeShort out (int digit))))) + + Boolean + (pg-type [_] :boolean) + (write-to [bool ^DataOutputStream out] + (.writeInt out 1) + (.writeByte out (if bool 1 0))) + + PGInterval + (pg-type [_] :interval) + (write-to [interval ^DataOutputStream out] + (.writeInt out 16) + (let [secs (.getSeconds interval) + mins (double (.getMinutes interval)) + hours (double (.getHours interval)) + seconds (+ secs (* 60 mins) (* 60 60 hours)) + days (.getDays interval) + months (.getMonths interval) + years (.getYears interval)] + (.writeLong out (long (* 1000000 seconds))) + (.writeInt out days) + (.writeInt out (+ months (* 12 years))))) + + java.util.Date + (pg-type [_] :timestamp) + (write-to [date ^DataOutputStream out] + (write-to (ptime/java-epoch->postgres-epoch (.getTime date)) out)) + + Instant + (pg-type [_] :timestamp) + (write-to [instant ^DataOutputStream out] + (write-to (ptime/java-epoch->postgres-epoch (.toEpochMilli instant)) out)) + + LocalDate + (pg-type [_] :date) + (write-to [date ^DataOutputStream out] + (let [days (-> date + .atStartOfDay + ptime/date-time->epoch-milli + ptime/java-epoch->postgres-days + int)] + (.writeInt out 4) + (.writeInt out days))) + + java.sql.Date + (pg-type [_] :date) + (write-to [date ^DataOutputStream out] + (write-to ^LocalDate (.toLocalDate date) out)) + + java.sql.Timestamp + (pg-type [_] :timestamp) + (write-to [ts ^DataOutputStream out] + (write-to ^Instant (.toInstant ts) out)) + + ;; Do not assume UTC, needs a time zone or offset + ;; LocalDateTime + ;; (pg-type [_] :timestamp) + ;; (write-to [ldt ^DataOutputStream out] + ;; (write-to ^Instant (.toInstant ldt) out)) + + ZonedDateTime + (pg-type [_] :timestamp) + (write-to [zdt ^DataOutputStream out] + (write-to ^Instant (.toInstant zdt) out)) + + OffsetDateTime + (pg-type [_] :timestamp) + (write-to [odt ^DataOutputStream out] + (write-to ^Instant (.toInstant odt) out)) + + PGpoint + (pg-type [_] :point) + (write-to [p ^DataOutputStream out] + (.writeInt out 16) + (.writeDouble out (.-x p)) + (.writeDouble out (.-y p))) + + PGline + (pg-type [_] :line) + (write-to [line ^DataOutputStream out] + (.writeInt out 24) + (.writeDouble out (.-a line)) + (.writeDouble out (.-b line)) + (.writeDouble out (.-c line))) + + PGpath + (pg-type [_] :path) + (write-to [p ^DataOutputStream out] + (let [points (.-points p) + closed (byte (if (.-open p) 0 1)) + byte-count (+ 1 ;; open/closed + 4 ;; number of points + (* 16 (count points)) ;; point data + )] + (.writeInt out byte-count) + (.writeByte out closed) + (.writeInt out (count points)) + (doseq [^PGpoint point points] + (.writeDouble out (.-x point)) + (.writeDouble out (.-y point))))) + + PGpolygon + (pg-type [_] :polygon) + (write-to [p ^DataOutputStream out] + (let [points (.-points p) + byte-count (+ 4 (* 16 (count points)))] + (.writeInt out byte-count) + (.writeInt out (count points)) + (doseq [^PGpoint point points] + (.writeDouble out (.-x point)) + (.writeDouble out (.-y point))))) + + PGbox + (pg-type [_] :box) + (write-to [box ^DataOutputStream out] + (let [points (.-point box)] + (.writeInt out 32) + (doseq [^PGpoint point points] + (.writeDouble out (.-x point)) + (.writeDouble out (.-y point))))) + + PGcircle + (pg-type [_] :circle) + (write-to [c ^DataOutputStream out] + (let [center ^PGpoint (.-center c)] + (.writeInt out 24) + (.writeDouble out (.-x center)) + (.writeDouble out (.-y center)) + (.writeDouble out (.-radius c)))) + + ;; TODO Is this a good idea? + ;; clojure.lang.APersistentVector + ;; (pg-type [_] nil) ;; array's use sub oid + ;; (write-to [this ^DataOutputStream out] + ;; (if (and (seq this) (satisfies? IPGBinaryWrite (first this))) + ;; (if-some [ba ^bytes (array->bytes (pg-type (first this)) this)] + ;; (do + ;; (.writeInt out (count ba)) + ;; (.write out ba)) + ;; (write-to nil out)) + ;; (write-to nil out))) + + UUID + (pg-type [_] :uuid) + (write-to [uuid ^DataOutputStream out] + (let [bb ^ByteBuffer (ByteBuffer/wrap (byte-array 16))] + (.putLong bb (.getMostSignificantBits uuid)) + (.putLong bb (.getLeastSignificantBits uuid)) + (.writeInt out 16) + (.writeInt out (.getInt bb 0)) + (.writeShort out (.getShort bb 4)) + (.writeShort out (.getShort bb 6)) + (.writeLong out (.getLong bb 8)))) + + nil + (oid [_] :unknown) + (write-to [_ ^DataOutputStream out] + (.writeInt out -1))) + +;; These primitive array impls have to be added separate or the +;; compiler complains + +(defmacro extend-primitive-array + [klass pg-type] + `(extend-type ~klass + IPGBinaryWrite + (pg-type [~'_] nil) + (write-to [this# ^DataOutputStream out#] + (if-some [ba# ^{:tag ~'bytes} (array->bytes ~pg-type this#)] + (do + (.writeInt out# (count ba#)) + (.write out# ba#)) + (write-to nil out#))))) + + ;; int array +(extend-primitive-array (Class/forName "[I") :int4) + + ;; long array +(extend-primitive-array (Class/forName "[J") :int8) + +;; float array +(extend-primitive-array (Class/forName "[F") :float4) + +;; double array +(extend-primitive-array (Class/forName "[D") :float8) + +;; String array +(extend-primitive-array (Class/forName "[Ljava.lang.String;") :text) + +(extend-primitive-array (Class/forName "[Ljava.util.UUID;") :uuid) + +(deftype JsonB [^String value] + Object + (toString [_] value) + IPGBinaryWrite + (pg-type [_] :jsonb) + (write-to [_ o] + (let [out ^DataOutputStream o] + (if (str/blank? value) + (write-to out nil) + (let [ba ^bytes (.getBytes value "UTF-8")] + (.writeInt out (+ 1 (count ba))) + (.writeByte out 1) ;; jsonb protocol version + (.write out ba)))))) + +(defn copy-to-stream [^ByteArrayOutputStream stream tuples] + (with-open [out ^DataOutputStream (DataOutputStream. (BufferedOutputStream. stream))] + ;; constant header + (.writeBytes out "PGCOPY\n\377\r\n\0") + ;; header flags (no OIDs) + (.writeInt out 0) + ;; header extension (unused) + (.writeInt out 0) + (doseq [tuple tuples] + (.writeShort out (count tuple)) + (doseq [field tuple] + (write-to field out))) + ;; footer constant + (.writeShort out (short -1)) + (.flush out))) + +(defn values->copy-rows-binary ^bytes [values] + (with-open [bout ^ByteArrayOutputStream (ByteArrayOutputStream. 4096)] + (copy-to-stream bout values) + (.toByteArray bout))) + +(defn ^CopyManager copy-manager [^java.sql.Connection conn] + (let [conn (if (.isWrapperFor conn BaseConnection) + (.unwrap conn BaseConnection) + conn)] + (CopyManager. conn))) + +(defn copy-table + ([conn table] + (copy-table conn table {:csv? true :headers? false})) + ([^java.sql.Connection conn table {:keys [csv? headers?]}] + (let [manager (copy-manager conn) + copy-query (str "COPY " table " TO STDOUT" + (when csv? " WITH CSV") + (when headers? " HEADER"))] + (with-open [out (ByteArrayOutputStream.)] + (.copyOut manager copy-query out) + (String. (.toByteArray out)))))) + +(defn copy-into! + "table-sql is the table name and columns for the COPY statement, + e.g. myschema.mytable(col1, col2). It should match the order of the + tuples exactly." + ([^java.sql.Connection conn + table-sql + values] + (let [manager (copy-manager conn) + vals (values->copy-rows-binary values)] + (with-open [stream (ByteArrayInputStream. vals)] + (.copyIn manager (str "COPY " (name table-sql) + " FROM STDIN WITH BINARY") stream)))) + ([^java.sql.Connection conn table cols values] + (let [table-spec (str (name table) + "(" + (str/join "," (map name cols)) + ")")] + (copy-into! conn table-spec values)))) diff --git a/src/clj_pgcopy/time.clj b/src/clj_pgcopy/time.clj new file mode 100644 index 0000000..1ce92d0 --- /dev/null +++ b/src/clj_pgcopy/time.clj @@ -0,0 +1,74 @@ +(ns clj-pgcopy.time + (:import (java.time LocalDateTime + LocalDate + ZoneOffset + ZonedDateTime + OffsetDateTime + Instant) + (java.util.concurrent TimeUnit))) + +(defn ^Long date-time->epoch-milli [^LocalDateTime dt] + (.. dt + (atOffset ZoneOffset/UTC) + toInstant + toEpochMilli)) + +(def epoch-distance + (- + (date-time->epoch-milli + ;; postgres epoch + (LocalDateTime/of 2000 1 1 0 0 0)) + (.toEpochMilli Instant/EPOCH))) + +;; 946684800000 + +;; The conversion is valid for any year 1583 CE onwards. +(defn java-epoch->postgres-epoch ^long [millis] + ;; returns microseconds + (long (* 1000 (- millis epoch-distance)))) + +(defn java-epoch->postgres-days [millis] + (->> (- millis epoch-distance) + (.toDays TimeUnit/MILLISECONDS) + int)) + +(defprotocol ToInstant + (-to-instant [self])) + +(extend-protocol ToInstant + LocalDateTime + (-to-instant [self] + (.. self + (atOffset ZoneOffset/UTC) + toInstant)) + LocalDate + (-to-instant [self] + (.. self + atStartOfDay + (atOffset ZoneOffset/UTC) + toInstant)) + Instant + (-to-instant [self] + self) + java.util.Date + (-to-instant [self] + (.toInstant self)) + java.sql.Date + (-to-instant [self] + (-to-instant + (.toLocalDate self))) + java.sql.Timestamp + (-to-instant [self] + (.toInstant self)) + ZonedDateTime + (-to-instant [self] + (.toInstant self)) + OffsetDateTime + (-to-instant [self] + (.toInstant self)) + nil + (-to-instant [_] + nil)) + +(defn to-instant [date-ish] + (-to-instant date-ish)) diff --git a/test/clj_pgcopy/core_test.clj b/test/clj_pgcopy/core_test.clj new file mode 100644 index 0000000..f4f22b9 --- /dev/null +++ b/test/clj_pgcopy/core_test.clj @@ -0,0 +1,236 @@ +(ns clj-pgcopy.core-test + (:require [clj-pgcopy.core :as copy] + [clj-pgcopy.time :as ptime] + [clojure.test :refer :all] + [clojure.java.jdbc :as jdbc]) + (:import (java.time LocalDateTime + LocalDate + Instant + ZoneId + ZoneOffset) + (org.postgresql.util PGInterval) + (org.postgresql.geometric PGbox + PGcircle + PGline + PGpath + PGpolygon + PGpoint) + (java.util TimeZone))) + +(def conn-spec "jdbc:postgresql://localhost:5432/test_pgcopy") + +(defprotocol IStringValue + (string-value [_])) + +(extend-protocol IStringValue + String + (string-value [this] this) + nil + (string-value [_] nil) + org.postgresql.jdbc.PgSQLXML + (string-value [this] (.getString this)) + Object + (string-value [this] (str this))) + +(use-fixtures :each + (fn use-utc [f] + (let [original (TimeZone/getDefault)] + (TimeZone/setDefault (TimeZone/getTimeZone "UTC")) + (f) + (TimeZone/setDefault original))) + (fn create-test-tables [f] + (jdbc/with-db-connection [conn conn-spec] + (jdbc/execute! conn "drop schema if exists copytest cascade") + (jdbc/execute! conn "create schema copytest") + (jdbc/execute! conn "create extension if not exists citext") + (jdbc/execute! conn (str "create table copytest.test(" + "c_int integer," + "c_bigint bigint," + "c_smallint smallint," + "c_text text," + "c_varchar varchar(32)," + "c_char char(2)," + "c_citext citext," + "c_text_array text[]," + "c_int_array int[]," + "c_double_array float8[]," + "c_uuid_array uuid[]," + "c_date date," + "c_ts timestamp," + "c_tstz timestamptz," + "c_boolean boolean," + "c_numeric numeric," + "c_decimal decimal(8,2)," + "c_float4 float4," + "c_float8 float8," + "c_uuid uuid," + "c_json json," + "c_jsonb jsonb," + "c_xml xml," + "c_interval interval," + "c_point point," + "c_line line," + "c_path path," + "c_box box," + "c_circle circle," + "c_polygon polygon," + "c_bytea bytea)")) + (f) + (jdbc/execute! conn "drop schema if exists copytest cascade")))) + +(deftest copy-columns-test + (let [row1 {:c_smallint (short 42) + :c_int (int 42) + :c_bigint (* 4 Integer/MAX_VALUE) + :c_text "Text" + :c_varchar "Varchar" + :c_char "ab" + :c_citext "CIText" + :c_text_array ["examples" "of" "text"] + :c_int_array [1 2 3 4 5] + :c_uuid_array [#uuid "cc7caf20-14e1-42de-a1e7-455d4f267111" + #uuid "cc7caf20-14e1-42de-a1e7-455d4f267112" + #uuid "cc7caf20-14e1-42de-a1e7-455d4f267113"] + :c_double_array [1.2 3.4 5.6] + :c_date (LocalDate/of 2018 1 2) + :c_ts (Instant/parse "2018-01-02T03:04:05.678Z") + :c_tstz (Instant/parse "2018-01-02T03:04:05.678Z") + :c_boolean true + :c_numeric 123456789.012345M + :c_decimal 4777.39M + :c_float4 (float 3.14159) + :c_float8 Math/PI + :c_uuid #uuid "cc7caf20-14e1-42de-a1e7-455d4f267111" + :c_json "{\"hello\": [1, 2, 3]}" + :c_jsonb (copy/->JsonB "{\"goodbye\": [3, 2, 1]}") + :c_xml "" + :c_interval (PGInterval. 1 2 3 4 5 6.789) + :c_point (PGpoint. "(1.2, -3.4)") + :c_line (PGline. "{1.2, -3.4, 5.6}") + :c_path (PGpath. "((1.2,-3.4),(-5.6,7.8),(9.0, 0.1))") + :c_polygon (PGpolygon. "((1.2,-3.4),(-5.6,7.8),(9.0, 0.1))") + :c_box (PGbox. "(-5.6,7.8),(1.2,-3.4)") + :c_circle (PGcircle. "<(1.2,-3.4), 9.1>") + :c_bytea "the byte array"} + columns (-> row1 keys vec) + data (vector row1 (into {} (map vector columns (repeat nil)))) + values (into [] (comp (map (fn [row] (-> row + (update :c_text_array #(into-array String %)) + (update :c_int_array int-array) + (update :c_double_array double-array) + (update :c_uuid_array #(into-array java.util.UUID %)) + (update :c_bytea #(if (string? %) + (.getBytes % "UTF-8") + %))))) + (map (apply juxt columns))) + data) + bytes->string #(when (and % (pos? (count %))) (String. %))] + (jdbc/with-db-connection [conn conn-spec] + (is (= (count data) + (copy/copy-into! (:connection conn) :copytest.test columns values))) + (let [results (->> (jdbc/query conn ["select * from copytest.test"]) + (map #(select-keys % columns )))] + (is (= (into [] + (comp + (map + (fn [row] + (-> row + (update :c_date ptime/to-instant) + (update :c_jsonb string-value)))) + ;;(map (apply juxt columns)) + ) + data) + (into [] + (comp + (map (fn [row] + (-> row + (update :c_date ptime/to-instant) + (update :c_ts ptime/to-instant) + (update :c_tstz ptime/to-instant) + (update :c_bytea bytes->string) + (update :c_text_array #(when % (not-empty (vec (.getArray %))))) + (update :c_int_array #(when % (not-empty (vec (.getArray %))))) + (update :c_double_array #(when % (not-empty (vec (.getArray %))))) + (update :c_uuid_array #(when % (not-empty (vec (.getArray %))))) + (update :c_citext string-value) + (update :c_json string-value) + (update :c_jsonb string-value) + (update :c_xml string-value)))) + ;;(map (apply juxt columns)) + ) + results))))))) + +(deftest datetime-inputs-test + (let [instant (Instant/parse "2018-01-02T03:04:05.678Z") + data [{;; ZonedDateTime -> timestamp + :c_ts (.atZone (LocalDateTime/ofInstant instant (ZoneId/of "Z")) + (ZoneId/of "Z")) + ;; ZonedDateTime -> timestamptz + :c_tstz (.atZone (LocalDateTime/ofInstant instant (ZoneId/of "Z")) + (ZoneId/of "Z"))} + + {;; OffsetDateTime -> timestamp + :c_ts (.atOffset (LocalDateTime/ofInstant instant (ZoneId/of "Z")) + ZoneOffset/UTC) + ;; OffsetDateTime -> timestamptz + :c_tstz (.atOffset (LocalDateTime/ofInstant instant (ZoneId/of "Z")) + ZoneOffset/UTC)} + + {;; Instant -> timestamp + :c_ts instant + ;; Instant -> timestamptz + :c_tstz instant} + + {;; java.util.Date -> timestamp + :c_ts (java.util.Date/from instant) + ;; java.util.Date -> timestamptz + :c_tstz (java.util.Date/from instant)} + + {;; java.sql.Timestamp -> timestamp + :c_ts (java.sql.Timestamp/from instant) + ;; java.sql.Timestamp -> timestamptz + :c_tstz (java.sql.Timestamp/from instant)}] + columns (-> data first keys vec) + values (into [] (map (apply juxt columns)) data) + fixup-row (fn [row] + (-> row + (update :c_tstz ptime/to-instant) + (update :c_ts ptime/to-instant))) + rows-xform (comp + (map fixup-row) + (map (apply juxt columns)))] + (jdbc/with-db-connection [conn conn-spec] + (is (= (count data) + (copy/copy-into! (:connection conn) :copytest.test columns values))) + (let [results (->> (jdbc/query conn ["select * from copytest.test"]) + (map #(select-keys % columns)))] + (is (= (into [] rows-xform data) + (into [] rows-xform results))))))) + +(deftest date-inputs-test + (let [data [;; LocalDate -> date + {:c_date (LocalDate/of 2018 1 2)} + ;; java.sql.Date -> date + {:c_date (java.sql.Date/valueOf (LocalDate/of 2018 1 2))} + ;; Far past dates + {:c_date (LocalDate/of 1980 1 2)} + {:c_date (java.sql.Date/valueOf (LocalDate/of 1582 10 4))} + {:c_date (java.sql.Date/valueOf (LocalDate/of 720 5 19))} + {:c_date (LocalDate/of 2 2 2)} + ;; Far future dates + {:c_date (LocalDate/of 4242 1 2)}] + columns (-> data first keys vec) + values (into [] (map (apply juxt columns)) data) + fixup-row (fn [row] + (-> row + (update :c_date ptime/to-instant))) + rows-xform (comp + (map fixup-row) + (map (apply juxt columns)))] + (jdbc/with-db-connection [conn conn-spec] + (is (= (count data) + (copy/copy-into! (:connection conn) :copytest.test columns values))) + (let [results (->> (jdbc/query conn ["select * from copytest.test"]) + (map #(select-keys % columns)))] + (is (= (into [] rows-xform data) + (into [] rows-xform results)))))))