diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..8fff04d --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,7 @@ +# Change Log +All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/). + +## [Unreleased] + +## [1.0.0] - 2022-02-03 +- Initial project release diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2315126 --- /dev/null +++ b/LICENSE @@ -0,0 +1,280 @@ +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public +License as published by the Free Software Foundation, either version 2 +of the License, or (at your option) any later version, with the GNU +Classpath Exception which is available at +https://www.gnu.org/software/classpath/license.html." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/dev/user.clj b/dev/user.clj new file mode 100644 index 0000000..17f8830 --- /dev/null +++ b/dev/user.clj @@ -0,0 +1,29 @@ +(ns user + (:require [bq2pg.config :refer [load-config-repl! CONFIG]] + [bq2pg.integration :refer [integrate!]] + [bq2pg.bq :refer [create-bq-client]] + [bq2pg.gcs :refer [create-gcs-client]] + [malli.clj-kondo :as mc] + [malli.dev :as md])) + +(-> (mc/collect *ns*) (mc/linter-config)) +(md/start!) + +(load-config-repl! {:filepath "./example/pokemon_integration.edn" + :extra {:export? true + :import? true + :gcs-name "unknown" + :gcs-folder "bq2pg" + :sa-path "/users/akiz/.config/gcp-batch-runner-dev-sa.json" + :db {:dbtype "postgres" + :dbname "postgres" + :user "postgres" + :host "localhost" + :port 5432 + :password "my_password"}}}) + +(def gcs-client (create-gcs-client CONFIG)) +(def bq-client (create-bq-client CONFIG)) + +(defn integrate-dev! [] + (integrate! gcs-client bq-client CONFIG)) diff --git a/example/pokemon_integration.edn b/example/pokemon_integration.edn new file mode 100644 index 0000000..6f00acd --- /dev/null +++ b/example/pokemon_integration.edn @@ -0,0 +1,12 @@ +{ :integrations + [ + { + :name "test" + :query "SELECT * FROM `project.test.pokemons` LIMIT 1000" + :location "EU" + :target-pg-table "public.pokemons" + :method "replace" + :timeout 120 + } + ] + } diff --git a/example_run.sh b/example_run.sh new file mode 100755 index 0000000..9aa4991 --- /dev/null +++ b/example_run.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +CONFIG=$1 + +BQEXPORT=true PGIMPORT=true PGHOST=localhost PGDATABASE=postgres PGPORT=5432 PGUSER=postgres PGPASSWORD=my_password GCSNAME=some-bucket GCSFOLDER=bq2pg lein run $CONFIG diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..a57a759 --- /dev/null +++ b/install.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +cd "$(dirname "$0")" +lein uberjar +cp ./target/bq2pg-1.0.0-standalone.jar ~/bin/bq2pg.jar +echo "#!/bin/bash" > ~/bin/bq2pg +echo "java -jar ~/bin/bq2pg.jar \$1" >> ~/bin/bq2pg +chmod 755 ~/bin/bq2pg diff --git a/project.clj b/project.clj new file mode 100644 index 0000000..7debaab --- /dev/null +++ b/project.clj @@ -0,0 +1,26 @@ +(defproject bq2pg "1.0.0" + :description "Integrate BQ data to your Postgresql tables." + :url "http://github.com/akiz/bq2pg" + :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" + :url "https://www.eclipse.org/legal/epl-2.0/"} + :dependencies [[org.clojure/clojure "1.10.1"] + [com.taoensso/timbre "5.1.2"] + [cprop "0.1.18"] + [expound "0.8.9"] + [com.stuartsierra/component "1.0.0"] + [com.github.seancorfield/next.jdbc "1.2.674"] + [org.postgresql/postgresql "42.2.22"] + [com.oscaro/clj-gcloud-storage "0.71-1.2"] + [babashka/fs "0.0.5"] + [com.google.cloud/google-cloud-bigquery "1.137.1"] + [org.immutant/scheduling "2.1.10"] + [clj-time "0.15.2"] + [com.taoensso/timbre "5.1.2"] + [authenticator "0.1.1"] + [metosin/malli "0.8.0"] + [org.clojure/data.csv "1.0.0"]] + :repl-options {:init-ns user} + :plugins [[lein-cljfmt "0.8.0"]] + :main ^:skip-aot bq2pg.core + :target-path "target/%s" + :profiles {:uberjar {:aot :all}}) diff --git a/readme.org b/readme.org new file mode 100644 index 0000000..2879189 --- /dev/null +++ b/readme.org @@ -0,0 +1,127 @@ +* BQ2PG +- Querying Bigquery can get really expensive and you can not run it locally. +- So I created BQ2PG because "No such thing as a high-priced question." + +** Main Points +[[./resources/images/intro.png]] +- Migrate data from Bigquery to Postgresql easily. +- Migrate data to already existing tables or create new ones automatically. +- Append rows or replace whole table. +- Run export and import individually. + + If there is no new export, no import will run. + + Data are loaded from small GZIP files, keep your disk's space free! + + +** How it works +[[./resources/images/bq2pg.jpg]] + +*** Config +- This is the main configuration file. +- The only mandatory part is =:integrations=, everything else can via provided by environment variables. +- check *example_run.sh* or [[Environment variables]]. + + +*** GCS and BQ clients +- both clients will be initialized with /$BQ2PGSA/ environment variable if provided +- you can also set the path inside the main configuration file under /:sa-path/ key. +- if you do not provide service account key it will try to use /gcloud tools/ login. + +*** Integrations +- Integrations must be created under *:integration* key in the main config file. +- Take a look at example integration in [[./example/pokemon_integration.edn]] +- These are mandatory keys: + + *:name* - transfer name, also used for bucket folder name - must be unique in all configurations! + + *:query* - SQL query definition for data extraction + + *:location* - location of BigQuery dataset + + *:target-pg-table* - target postgre table in format schema_name.table_name + + *:method* - postgre update mode - append (default) or replace (truncate target table before) + + *:timeout* - timeout in seconds, checks on bucket for new export from BQ (if new export does not exist after timeout exceeded, then continue) + +- Bigquery fields are converted to text: + - if your /:target-pg-table/ already exists, it will try to convert value to its column type + - If that table doesn't exist it will be created and every field will be imported as text + +*** Export? +- You can set /:export?/ key to false if you don't want bq2pg to export your data from Bigqeury to Google cloud storage. +- This value can be configured in main config file under /:export?/ key or as /BQEXPORT/ env variable. + +*** Import? +- You can set /:import?/ key to false if you don't want bq2pg to import your data into Postgresql database. +- *Keep in mind:* bq2pg creates record for every last integration in .last_import.edn so if you have already imported some existing export it won't be imported twice. + - it means that it won't be imported until export to the target folder finishes. +- This value can be configured in main config file under /:import?/ key or as /PGIMPORT/ env variable. + +*** Stream GZIP files +- Bigquery data are exported to a batch of GZIP files which are loaded one by one and save space on your disk. + + +** Examples +*** Example 1 - easy and quick +1. Download bq2pg-1.jar from releases +2. create your /integration.edn/ like this: + #+BEGIN_SRC clojure + {:export? true + :import? true + :gcs-name "my-bucket" + :gcs-folder "bq2pg" + :db {:dbtype "postgres" + :dbname "postgres" + :user "postgres" + :host "somehost" + :port 5432 + :password "my_password"} + :integrations [{:name "test" + :query "SELECT * FROM `project.test.pokemons` LIMIT 1000" + :location "EU" + :target-pg-table "public.pokemons" + :method "replace" + :timeout 120}]} + #+END_SRC + +3. run =java -jar bq2pg.jar integration.edn= + +*** Example 2 - clojure developer, installation +1. run ./install.sh +2. bq2pg will be compiled and you can run it by =~/bin/bq2pg= + + or add =$HOME/bin= to your PATH. + +*** Example 3 - clojure developer, interactive use +1. run repl and switch to =user= namespace +2. you can see example configuration + integration you must update before you run *integrate-dev!*. + +*** Example 4 - real use-case +1. Place /bq2pg.jar/ or personalized /example_run.sh/ somewhere in your /PATH/ (see [[Example 2 - clojure developer, installation]]) +2. Configure every environment variable so your config file will contain only integrations. +3. Schedule bq2pg with same env vars but different configs and synchronize Bigquery and Postgresql on daily basis. + +** Environment variables +- if you are into 12factor ;) + +- *$PGHOST* = Name / address of the Postgresql database server you will connect to. +- *$PGDATABASE* = Database name +- *$PGPORT* = Database port +- *$PGUSER* = Database user +- *$PGPASSWORD* = Database password + - this is *OPTIONAL* - i would recommend you to use .pgpass which should be parsed. +- *$BQEXPORT* = true [default] / false + + Disable this only if you want to export once but import multiple times. +- *$PGIMPORT* = true / false + + Disable this only if you only want to export data from the environment. +- *$GCSNAME* = bucket name +- *$GCSFOLDER* = directory name on bucket - for bq2pg purposes +- *$BQ2PGSA* = path to service account JSON key + + This is an !optional! parameter, you can also login via =gcloud auth login=... +- If you happen to use a proxy, don't forget to set one up as well: + + *$PROXYHOST* = Adress of proxy server + + *$PROXYUSER* = Proxy user + + *$PROXYPASSWORD* = Proxy password + +** Caveats +1. Your Bigquery dataset and Google Cloud Storage must exist in the same location or you get error. +2. you must have a =bigquery.job.create= rights so you can export data. + - + r/w rights on configured GCS + + +** Development +- There is Malli function schema for every fn, lintering is prepared in *user* namespace - this makes debugging experience much better. + diff --git a/resources/images/bq2pg.jpg b/resources/images/bq2pg.jpg new file mode 100644 index 0000000..938453d Binary files /dev/null and b/resources/images/bq2pg.jpg differ diff --git a/resources/images/intro.png b/resources/images/intro.png new file mode 100644 index 0000000..5eac44c Binary files /dev/null and b/resources/images/intro.png differ diff --git a/src/bq2pg/bq.clj b/src/bq2pg/bq.clj new file mode 100644 index 0000000..61edcba --- /dev/null +++ b/src/bq2pg/bq.clj @@ -0,0 +1,178 @@ +(ns bq2pg.bq + (:require [bq2pg.utils :refer [rand-str]] + [bq2pg.config :refer [Config]] + [malli.core :as m] + [clojure.java.io :as io] + [taoensso.timbre :as timbre]) + (:import (com.google.cloud.bigquery BigQueryOptions + QueryJobConfiguration + ExtractJobConfiguration + TableId + JobId + JobInfo + BigQuery$JobOption) + (com.google.auth.oauth2 ServiceAccountCredentials) + (com.google.common.collect ImmutableMap) + (com.google.cloud RetryOption))) + +(defn load-sa + "Returns 'ServiceAccountCredentials' if there is json key on `sa-path`. + Otherwise returns nil." + [sa-path] + (try + (with-open [sa-stream (io/input-stream sa-path)] + (ServiceAccountCredentials/fromStream sa-stream)) + (catch java.io.FileNotFoundException _ + (timbre/warn "No valid service account provided!")))) + +(def Service-account [:fn #(= (type %) com.google.auth.oauth2.ServiceAccountCredentials)]) + +(m/=> load-sa [:=> + [:cat string?] + [:or nil? Service-account]]) + +(defn create-bq-client + "Creates a reusable bigquery client. + It uses service account on `sa-path` if provided. + Otherwise uses Application Default Credentials." + [{:keys [sa-path]}] + (if-not sa-path + (-> (BigQueryOptions/getDefaultInstance) .getService) + (let [credentials (load-sa sa-path)] + (-> (BigQueryOptions/newBuilder) + (.setCredentials credentials) + .build + .getService)))) + +(def Bq-client + [:fn #(= (type %) com.google.cloud.bigquery.BigQueryImpl)]) + +(m/=> create-bq-client [:=> + [:cat Config] + Bq-client]) + +(defn job->destination-table + "Returns destination table found in bigquery `job`." + [^com.google.cloud.bigquery.Job job] + (let [{:keys [project dataset table]} + (-> (.getConfiguration job) + bean + :destinationTable + bean)] + (format "`%s.%s.%s`" project dataset table))) + +(def Bq-job [:fn #(= (type %) com.google.cloud.bigquery.Job)]) + +(def Destination-table [:re #"\`.*?\..*?\..*\`"]) + +(m/=> job->destination-table [:=> + [:cat Bq-job] + Destination-table]) + +(defn job->results-map + "Takes bigquery `job` and returns its 'destination table'." + [^com.google.cloud.bigquery.Job job] + (-> (.getConfiguration job) + bean + :destinationTable + bean)) + +(def Result-map [:map + [:project string?] + [:dataset string?] + [:table string?]]) + + +(m/=> job->results-map [:=> + [:cat Bq-job] + Result-map]) + +(defn create-query-conf + "Creates bigquery configuration that will be used in a job." + [query-sql query-name] + (-> (QueryJobConfiguration/newBuilder query-sql) + (.setLabels (ImmutableMap/of "query-name" query-name)) + .build)) + +(def Query-conf [:fn #(= (type %) com.google.cloud.bigquery.QueryJobConfiguration)]) + +(m/=> create-query-conf [:=> + [:cat string? string?] + Query-conf]) + +(defn create-job-id + "Create bigquery job-id that will be used in a job." + [job-name dataset-location] + (-> (JobId/newBuilder) + (.setLocation dataset-location) + (.setJob job-name) + .build)) + +(def Job-id [:fn #(= (type %) com.google.cloud.bigquery.AutoValue_JobId)]) + +(m/=> create-job-id [:=> + [:cat string? string?] + Job-id]) + +(defn query->table + "Sends `query-sql` via `bq-client` and sets `query-name` so it can be identified in bigquery history." + [^com.google.cloud.bigquery.BigQueryImpl bq-client query-name query-sql location] + (let [query-config (create-query-conf query-sql query-name) + job-name (format "jobId_%s" (rand-str 12)) + job-id (create-job-id job-name location) + job-info (JobInfo/of job-id query-config)] + (.create bq-client job-info (into-array BigQuery$JobOption [])))) + +(m/=> query->table [:=> + [:cat Bq-client string? string? string?] + Bq-job]) + +(defn job-result->gcs + "Takes `job-result-map` and extracts result via `bq-client` to `gcs-uri`." + [^com.google.cloud.bigquery.BigQueryImpl bq-client job-result-map gcs-uri] + (let [{:keys [project dataset table]} job-result-map + destination-uri (str gcs-uri "/" table "*.gz") + table-id (TableId/of project dataset table) + extract-config (-> (ExtractJobConfiguration/newBuilder table-id destination-uri) + (.setCompression "gzip") + (.setFormat "CSV") + .build) + job-info (JobInfo/of extract-config)] + (.create bq-client job-info (into-array BigQuery$JobOption [])))) + +(m/=> job-result->gcs [:=> + [:cat Bq-client Result-map string?] + Bq-job]) + +(defn run-and-wait! + "Runs provided `bq-job` and waits for its results." + [^com.google.cloud.bigquery.Job bq-job] + (.waitFor bq-job (into-array RetryOption []))) + +(m/=> run-and-wait! [:=> + [:cat Bq-job] + Bq-job]) + +(defn bq-run-query! + "Runs provided `query` as 'bq-job' via `bq-client` as `job-name`." + [^com.google.cloud.bigquery.BigQueryImpl bq-client job-name query location] + (-> (query->table bq-client job-name query location) + run-and-wait! + job->results-map)) + +(m/=> bq-run-query! [:=> + [:cat Bq-client string? string? string?] + Result-map]) + +(defn bq-query->gcs! + "Runs and extracts `query` as gzipped csv to `gcs-uri` via `bq-client` as `job-name`." + [^com.google.cloud.bigquery.BigQueryImpl bq-client job-name query location gcs-uri] + (try (let [job-result (bq-run-query! bq-client job-name query location)] + (-> (job-result->gcs bq-client job-result gcs-uri) + run-and-wait!)) + (catch com.google.cloud.bigquery.BigQueryException e + (timbre/error (-> (bean e) :message))))) + +(m/=> bq-query->gcs! [:=> + [:cat Bq-client string? string? string? string?] + [:or Bq-job nil?]]) diff --git a/src/bq2pg/config.clj b/src/bq2pg/config.clj new file mode 100644 index 0000000..e80777e --- /dev/null +++ b/src/bq2pg/config.clj @@ -0,0 +1,167 @@ +(ns bq2pg.config + (:require + [bq2pg.utils :as utils] + [clojure.string :as string] + [cprop.core :refer [load-config]] + [babashka.fs :as fs] + [malli.core :as m] + [malli.error :as me] + [taoensso.timbre :as timbre])) + + +(def Ne-string + [:and string? [:fn not-empty]]) + +(def Db-conf + [:map + [:dbtype Ne-string] + [:dbname Ne-string] + [:user Ne-string] + [:host Ne-string] + [:port [:or int? string?]]]) + +(def Method + [:enum "append" "replace"]) + +(def Table-name + [:re #".*\..*"]) + +(def Location + [:enum "us-central1" "us-west4" "us-west2" "northamerica-northeast1" "us-east4" "us-west1" "us-west3" "southamerica-east1" "southamerica-west1" "us-east1" "northamerica-northeast2" + "europe-west1" "europe-north1" "europe-west3" "europe-west2" "europe-west4" "europe-central2" "europe-west6" "asia-south2" "asia-east2" "asia-southeast2" "australia-southeast2" + "asia-south1" "asia-northeast2" "asia-northeast3" "asia-southeast1" "australia-southeast1" "asia-east1" "asia-northeast1" "EU" "US"]) + +(def Integration + [:map + [:name Ne-string] + [:query Ne-string] + [:location Location] + [:target-pg-table Table-name] + [:method Method] + [:timeout int?]]) + +(def Config + [:map + [:gcs-name Ne-string] + [:gcs-folder Ne-string] + [:db Db-conf] + [:export? boolean?] + [:import? boolean?] + [:integrations [:vector Integration]] + [:sa-path {:optional true} Ne-string]]) + +;; todo - simplify config - no nested maps +(def CONFIG nil) + +(defn- set-config! + "Alters global variable CONFIG by provided `config`." + [config] + (alter-var-root + (var CONFIG) + (constantly config))) + + +(defn- exit + "Exits with provided `code`,`template` and `& args`." + [code template & args] + (let [out (if (zero? code) *out* *err*)] + (binding [*out* out] + (println (apply format template args)))) + (System/exit code)) + + +(defn coerce-config + "Checks and returns provided `config`." + [config] + (cond + (m/validate Config config) + config + :else (let [report (m/explain Config config)] + (exit 1 "%s" (me/humanize report))))) + + +(m/=> coerce-config [:=> + [:cat Config] + Config]) + + +(defn parse-pgpass-pwd + "Retuns pgpass password if there exist same host, port, db and user config. " + [s host port db user] + (let [[chost cport cdb cuser cpwd] (string/split s #":")] + (when (and (= host chost) (= port cport) (= db cdb) (= user cuser)) + cpwd))) + +(defn load-db-pwd [host port db user] + (if-let [dbpwd (System/getenv "PGPASSWORD")] + dbpwd + (if-let [pgpass (slurp + (fs/file + (System/getProperty "user.home") + ".pgpass"))] + (let [creds (string/split pgpass #"\n")] + (some #(parse-pgpass-pwd % host port db user) creds)) + (timbre/warn "DB pwd not found in ~/.pgpass or $PGPASSWORD env var.")))) + +(defn load-env + "Loads various environment variables and removes any nil values. " + [] + (utils/remove-empty-vals + {:proxy-host (System/getenv "PROXYHOST") + :proxy-user (System/getenv "PROXYUSER") + :proxy-password (System/getenv "PROXYPASSWORD") + :export? (Boolean/valueOf (if-let [export? (System/getenv "BQEXPORT")] + export? "true")) + :import? (Boolean/valueOf (if-let [import? (System/getenv "PGIMPORT")] + import? "true")) + :gcs-name (System/getenv "GCSNAME") + :gcs-folder (System/getenv "GCSFOLDER") + :db {:dbtype "postgres" + :host (System/getenv "PGHOST") + :dbname (System/getenv "PGDATABASE") + :port (System/getenv "PGPORT") + :user (System/getenv "PGUSER") + :password (load-db-pwd (System/getenv "PGHOST") + (System/getenv "PGPORT") + (System/getenv "PGDATABASE") + (System/getenv "PGUSER"))} + :sa-path (System/getenv "BQ2PGSA")})) + + +(defn- fake-exit + "Like exit.. but without exiting REPL." + [_ template & args] + (let [message (apply format template args)] + (throw (new Exception ^String message)))) + +(defn load-config! + "Validates and loads CONFIG on `filepath`." + [{filepath :filepath extra :extra}] + (try + (-> (load-config :file filepath :merge [(load-env) extra]) + (coerce-config) + (set-config!)) + (catch Exception e + (timbre/debug e) + (timbre/error "Wrong configuration file provided!")))) + +(m/=> load-config! [:=> + [:cat + [:map + [:filepath string?] + [:extra {:optional true} + [:map + [:export? {:optional true} boolean?] + [:import? {:optional true} boolean?] + [:gcs-name {:optional true} string?] + [:gcs-folder {:optional true} string?] + [:db {:optional true} Db-conf] + [:intregations {:optional true} [:vector Integration]] + [:sa-path {:optional true} Ne-string]]]]] + [:or Config nil?]]) + +(defn load-config-repl! + "Validates and loads CONFIG on `filepath` - uses fake exit." + [config-map] + (with-redefs [exit fake-exit] + (load-config! config-map))) diff --git a/src/bq2pg/core.clj b/src/bq2pg/core.clj new file mode 100644 index 0000000..3271fe9 --- /dev/null +++ b/src/bq2pg/core.clj @@ -0,0 +1,15 @@ +(ns bq2pg.core + (:require + [bq2pg.bq :refer [create-bq-client]] + [bq2pg.config :refer [CONFIG load-config!]] + [bq2pg.gcs :refer [create-gcs-client]] + [bq2pg.integration :refer [integrate!]] + [bq2pg.proxy :refer [set-proxy!]]) + (:gen-class)) + +(defn -main [& args] + (load-config! {:filepath (first args)}) + (let [gcs-client (create-gcs-client CONFIG) + bq-client (create-bq-client CONFIG)] + (set-proxy! CONFIG) + (integrate! gcs-client bq-client CONFIG))) diff --git a/src/bq2pg/db.clj b/src/bq2pg/db.clj new file mode 100644 index 0000000..f26d87c --- /dev/null +++ b/src/bq2pg/db.clj @@ -0,0 +1,161 @@ +(ns bq2pg.db + (:require + [clojure.string :as str] + [clojure.java.io :as io] + [next.jdbc :as jdbc] + [next.jdbc.date-time] + [clojure.data.csv :as csv] + [bq2pg.config :refer [Table-name]] + [malli.core :as m] + [taoensso.timbre :as timbre] + [next.jdbc.types + :as types])) + +(defn connectable? + "Returns true if `conn` can be used by JDBC to initialize connection + otherwise returns false. + `Conn` can be map or 'org.postgresql.jdbc.PgConnection'" + [conn] + (or (map? conn) + (= (type conn) + org.postgresql.jdbc.PgConnection))) + +(def Connectable [:fn connectable?]) + +(defn- get-type + "Gets Postgresql type for selected `column-name` in `table` with jdbc based on `connectable`." + [connectable table-name column-name] + (let [query "SELECT data_type FROM information_schema.columns where table_schema = '%s'and table_name = '%s' and upper(column_name) = '%s'" + [schema table] (str/split (name table-name) #"\.") + col-name (str/upper-case column-name)] + (:columns/data_type (jdbc/execute-one! + connectable + [(format query schema table col-name)])))) + +(m/=> get-type [:=> + [:cat Connectable Table-name string?] + [:enum "bool" "bit" "int8" "bigserial" "oid" "bytea" "char" "bpchar" "numeric" "int4" "serial" "int2" "smallserial" + "float4" "float8" "money" "name" "text" "character varying" "varchar" "date" "time" "timetz" "timestamp" "timestampz"]]) + +(defn convert-type + "Convert Postgresql `type` to conversion function" + [type] + (case type + ("bool" "bit") #(types/as-bit %) + ("int8" "bigserial" "oid") #(types/as-bigint %) + "bytea" #(types/as-binary %) + ("char" "bpchar") #(types/as-char %) + "numeric" #(types/as-numeric %) + ("int4" "serial") #(types/as-integer %) + ("int2" "smallserial") #(types/as-smallint %) + "float4" #(types/as-real %) + ("float8" "money") #(types/as-double) + ("name", "text", "character varying", "varchar") #(types/as-varchar %) + "date" #(try (.parse (java.text.SimpleDateFormat. "yyyy-MM-dd") %) + (catch Exception _ nil)) + ("time", "timetz") #(try (types/as-time %) + (catch Exception _ nil)) + ("timestamp", "timestampz") #(try (types/as-timestamp %) + (catch Exception _ nil)) + #(types/as-other %))) + +(m/=> convert-type [:=> + [:cat [:enum "bool" "bit" "int8" "bigserial" "oid" "bytea" "char" "bpchar" "numeric" "int4" "serial" "int2" "smallserial" + "float4" "float8" "money" "name" "text" "character varying" "varchar" "date" "time" "timetz" "timestamp" "timestampz"]] + fn?]) + +(def Header [:vector string?]) + +(defn- get-conv-fns + "Takes `header` and maps every column in `table` to matched conversion functions." + [connectable table-name header] + (mapv #(convert-type (get-type connectable table-name %)) header)) + +(m/=> get-conv-fns [:=> + [:cat Connectable Table-name Header] + vector?]) + +(defn- build-query + "Takes `table` and `header` and build sql query template for next.jdbc" + [table header] + (let [cols (str/join "," (map #(format "\"%s\"" %) header)) + placeholders (str/join "," (for [_ header] "?")) + query (format "insert into %s (%s) values (%s)" (name table) cols placeholders)] + query)) + +(m/=> build-query [:=> + [:cat Table-name Header] + string?]) + +(defn- get-header + "Takes first row from `coll` as a header + which is splitted on a `delimiter`. " + [coll delimiter] + (->> (str/split (first coll) + (re-pattern delimiter)) + (mapv str))) + +(m/=> get-header [:=> + [:cat [:sequential string?] string?] + Header]) + +(defn truncate-table! + "Tries to reuse or create jdbc connection based on `connectable` and truncate `table`. + `Connectable` can be map or `org.postgresql.jdbc.PgConnection. + `Table` is a string in 'schema.table' form." + [connectable table-name] + (try + (jdbc/execute! connectable [(format "truncate table %s;" (name table-name))]) + (timbre/info (format "Table: %s truncated!" table-name)) + (catch Exception _ + (timbre/warn (format "Table: %s can't be truncated!" table-name)) + ))) + +(m/=> truncate-table! [:=> + [:cat Connectable Table-name] + any?]) + +(defn table-exists? + "Checks if `table` exists in db with jdbc based on `connectable`." + [connectable table-name] + (let [[schema table] (str/split table-name #"\.") + query (format "select tablename from pg_tables where schemaname='%s' and tablename='%s';" schema table)] + (-> (jdbc/execute! connectable [query]) + seq))) + +(m/=> table-exists? [:=> + [:cat Connectable Table-name] + [:or seq? nil?]]) + +(defn create-table! [con table-name table-header] + (let [header (str/join " text," (map #(format "\"%s\"" %) table-header)) + + query (format "CREATE TABLE %s (%s text);" table-name header)] + (timbre/info query ) + (jdbc/execute! con [query]))) + +(defn insert-csv-from-stdin! + "Inserts csv from `stdin` parsed with `delimiter` to `table` via next.jdbc `con`." + [stdin con table-name delimiter] + (with-open [reader (io/reader stdin)] + (let [lines (line-seq reader) + table-header (get-header lines delimiter)] + (when-not (table-exists? con table-name) + (create-table! con table-name table-header)) + (doall + (let [query (build-query table-name table-header) + conv-fns (get-conv-fns con table-name table-header)] + (doseq [batch (partition-all 100000 (rest lines))] + (try + (jdbc/execute-batch! con query + (->> batch + (mapcat csv/read-csv) + (mapv (fn [row] (map #(%1 %2) conv-fns row)))) + {:reWriteBatchedInserts true}) + (catch Exception e + (timbre/debug e)) + ))))))) + +(m/=> insert-csv-from-stdin! [:=> + [:cat any? Connectable Table-name string?] + [:or seq? nil?]]) diff --git a/src/bq2pg/gcs.clj b/src/bq2pg/gcs.clj new file mode 100644 index 0000000..47a9b8a --- /dev/null +++ b/src/bq2pg/gcs.clj @@ -0,0 +1,107 @@ +(ns bq2pg.gcs + (:require + [taoensso.timbre :as timbre] + ;; [bq2pg.config :refer ] + [clojure.string :as str] + [clj-gcloud.storage :as st] + [clj-gcloud.coerce :as cr] + [clj-time.coerce :as tc] + [babashka.fs :as fs] + [malli.core :as m])) + +(def Gcs-client + [:fn #(= (type %) com.google.cloud.storage.StorageImpl)]) + +(def Gcs-uri + [:re #"gs:\/\/.*"]) + +(def Gzip-uri + [:re #"gs:\/\/.*\.gz"]) + +(def Gzip-uris + [:and + [:not empty?] + [:sequential Gzip-uri]]) + +(defn create-gcs-client + "Creates a reusable google cloud storage client. + It uses GOOGLE_APPLICATION_CREDENTIALS if no SA provided. + You can set this by running 'gcloud auth application-default login'" + [{:keys [sa-path]}] + (if-not sa-path + (st/init {}) + (st/init {:credentials sa-path}))) + +(m/=> create-gcs-client [:=> + [:cat [:or map? nil?]] + Gcs-client]) + +(defn list-gzip-uris + "Returns coll of GCS blobs in gs-uri" + [storage-client gcs-uri] + (->> (st/ls storage-client gcs-uri {}) + (filter #(str/ends-with? (.getName %) ".gz")) + (map #(format "gs://%s/%s" + (.getBucket %) + (.getName %))))) + +(m/=> list-gzip-uris [:=> + [:cat Gcs-client Gcs-uri] + Gzip-uris]) + +(defn- gcs-uri->temp-uri + "Returns temporary path for file on `gcs-uri`." + [gcs-uri] + (->> (fs/file-name gcs-uri) + (fs/file (fs/temp-dir)) + .toString)) + +(m/=> gcs-uri->temp-uri [:=> + [:cat Gcs-uri] + string?]) + +(defn download-blob! + [storage-client gcs-uri] + (let [target-path (gcs-uri->temp-uri gcs-uri)] + (st/download-file-from-storage storage-client + gcs-uri + target-path) + target-path)) + +(m/=> download-blob! [:=> + [:cat Gcs-client Gcs-uri] + string?]) +;; todo - move this try catch otusdie as download-blob etc. should be also checked and +(defn delete-folder! + [storage-client gcs-uri] + (doseq [blob (st/ls storage-client gcs-uri)] + (let [{:keys [blob-id]} (cr/->clj blob) + path (format "gs://%s/%s" (:bucket blob-id) (:name blob-id))] + (timbre/info (format "Deleting blob on: %s" path)) + (st/delete-blob storage-client (st/->blob-id path))))) + +(m/=> delete-folder! [:=> + [:cat Gcs-client Gcs-uri] + any?]) + +(defn create-empty-blob [storage-client gcs-uri] + (st/create-blob storage-client (st/blob-info (st/->blob-id gcs-uri) {}))) + +(m/=> create-empty-blob [:=> + [:cat Gcs-client Gcs-uri] + any?]) + +(defn get-blob-moddate [storage-client blob-uri] + (try (->> (st/->blob-id blob-uri) + (st/get-blob storage-client) + .getCreateTime + tc/from-long) + (catch Exception e + (timbre/error e) + nil))) + + (m/=> get-blob-moddate [:=> + [:cat Gcs-client Gcs-uri] + [:or + [:fn #(= (type %) org.joda.time.DateTime)] + nil?]]) diff --git a/src/bq2pg/integration.clj b/src/bq2pg/integration.clj new file mode 100644 index 0000000..7abf0b2 --- /dev/null +++ b/src/bq2pg/integration.clj @@ -0,0 +1,127 @@ +(ns bq2pg.integration + (:require + [bq2pg.db :as db :refer [Connectable]] + [bq2pg.gcs :as gcs :refer [Gcs-uri Gzip-uris Gcs-client]] + [bq2pg.config :refer [Method Config Integration]] + [bq2pg.bq :as bq :refer [Bq-client]] + [clj-time.coerce :as tc] + [bq2pg.utils :refer [gcs-directory + update-local-state + read-last-state + before? + interval>]] + [babashka.fs :as fs] + [malli.core :as m] + [taoensso.timbre :as timbre] + [clojure.java.io :as io] + [next.jdbc :as jdbc])) + +(defn stream-gzip-to-db! + "Streams content of GZIPPED csv into table in CONNECTABLE." + [gzip-path delimiter con table-name] + (with-open [in (-> gzip-path + io/input-stream + java.util.zip.GZIPInputStream. + java.io.InputStreamReader. + java.io.BufferedReader.)] + (timbre/info (format "Streaming %s to %s" gzip-path table-name)) + (db/insert-csv-from-stdin! in con table-name delimiter))) + +(m/=> stream-gzip-to-db! [:=> + [:cat string? string? Connectable string?] + any?]) + +(defn stream-gcs-uri-to-db! + "Downloads and streams all GZIPPED files in GCS-FOLDER into table in CONNECTABLE. + It can truncate table or append data - based on selected method which is 'append' or 'replace'." + [gcs-client connectable gcs-folder-uri target-pg-table method] + (let [gzip-uris (gcs/list-gzip-uris gcs-client gcs-folder-uri) + table-exists? (db/table-exists? connectable target-pg-table)] + (if (m/validate Gzip-uris gzip-uris) + (try (jdbc/with-transaction [con (jdbc/get-connection connectable)] + (when (and table-exists? (= method "replace")) + (db/truncate-table! con target-pg-table)) + (doseq [uri gzip-uris] + (timbre/info (format "Downloading file from: %s" uri)) + (let [blob (gcs/download-blob! gcs-client uri)] + (stream-gzip-to-db! blob "," con target-pg-table) + (timbre/info "Gzip has been loaded into DB") + (fs/delete-if-exists blob))) + (timbre/info "Job finished!")) + (catch org.postgresql.util.PSQLException e + (prn e))) + (timbre/warn (format "No files in %s - nothing to integrate!" gcs-folder-uri))))) + +(m/=> stream-gcs-uri-to-db! [:=> + [:cat Gcs-client Connectable Gcs-uri string? Method] + any?]) + +(defn export! + "Exports compressed Bigquery data defined in integration to GCS defined in CONFIG." + [gcs-client bq-client gcs-name gcs-folder integration] + (let [{:keys [name query location]} integration + gcs-uri (gcs-directory gcs-name gcs-folder name) + gcs-state-uri (gcs-directory gcs-name gcs-folder name "EXPORTED")] + (try + (gcs/delete-folder! gcs-client gcs-uri) + (timbre/info (format "Exporting BQ result of following query: %s" query)) + (if (bq/bq-query->gcs! bq-client name query location gcs-uri) + (do (gcs/create-empty-blob gcs-client gcs-state-uri) + {:status 1}) + {:status 0}) + (catch com.google.cloud.storage.StorageException e + (timbre/fatal e) + (System/exit 0))))) + +(def Status [:map [:status [:enum 0 1]]]) + +(m/=> export! [:=> + [:cat Gcs-client Bq-client string? string? Integration] + Status]) + +(defn import! + "Imports compressed CSV files from GCS to database." + ([gcs-client connectable gcs-name gcs-folder integration] + (import! gcs-client connectable gcs-name gcs-folder integration false)) + ([gcs-client connectable gcs-name gcs-folder integration force?] + (let [{:keys [name target-pg-table timeout method]} integration + gcs-uri (gcs-directory gcs-name gcs-folder name) + gcs-state-uri (gcs-directory gcs-name gcs-folder name "EXPORTED")] + (loop [start-time (.getTime (new java.util.Date)) + cur-time start-time] + (if-let [last-update (gcs/get-blob-moddate gcs-client gcs-state-uri)] + (let [last-import (read-last-state name)] + (if (or force? ;not implemented yet! + (before? last-import last-update)) + (do (timbre/info (format "Finding and importing gzipped files from: %s to: postgresql table %s" gcs-uri target-pg-table)) + (stream-gcs-uri-to-db! gcs-client connectable gcs-uri target-pg-table method) + (update-local-state name (tc/from-long last-update))) + (timbre/info (format "No new update for: %s since %s" name (tc/from-long last-update))))) + (if (interval> start-time cur-time timeout) + (timbre/info (format "Timeout for %s has reached its limit: %s secs." name timeout)) + (do (timbre/info "Waiting for 15 seconds before checking GCS again.") + (Thread/sleep 15000) + (recur start-time + (.getTime (new java.util.Date)))))))))) + +(m/=> import! + [:function + [:=> [:cat Gcs-client Connectable string? string? Integration] any?] + [:=> [:cat Gcs-client Connectable string? string? Integration boolean?] any?]]) + +(defn integrate! + "Runs Export, Import or both - based on provided CONFIG." + [gcs-client bq-client config] + (let [{:keys [db gcs-name gcs-folder integrations import? export?]} config + status (atom {:status 1})] + (doseq [x integrations] + (timbre/info (format "Starting integration of: %s." (:name x))) + (when export? + (->> (export! gcs-client bq-client gcs-name gcs-folder x) + (reset! status))) + (when (and (= 1 (:status @status)) import?) + (import! gcs-client db gcs-name gcs-folder x))))) + +(m/=> integrate! [:=> + [:cat Gcs-client Bq-client Config] + nil?]) diff --git a/src/bq2pg/proxy.clj b/src/bq2pg/proxy.clj new file mode 100644 index 0000000..a76d081 --- /dev/null +++ b/src/bq2pg/proxy.clj @@ -0,0 +1,17 @@ +(ns bq2pg.proxy + (:require + [authenticator.core :as auth])) + +(defn get-credentials + "Returns a proxy user and proxy password in your config file." + [proxy-cfg options] + (let [{:keys [proxy-host + proxy-user + proxy-password]} proxy-cfg] + (when (= (:host options) proxy-host) + [proxy-user proxy-password]))) + +(defn set-proxy! + "Sets a HTTP proxy and only if there is a proxy configuration inside your config file." + [config] + (auth/set-default-authenticator (partial get-credentials config))) diff --git a/src/bq2pg/utils.clj b/src/bq2pg/utils.clj new file mode 100644 index 0000000..d8affa8 --- /dev/null +++ b/src/bq2pg/utils.clj @@ -0,0 +1,37 @@ +(ns bq2pg.utils + (:require [babashka.fs :as fs] + [clj-time.core :as t])) + +(defn gcs-directory [bucket-name & opts] + (format "gs://%s/%s" bucket-name (.toString (apply fs/file opts)))) + +(defn load-local-state [] + (try + (read-string + (slurp ".last_import.edn")) + (catch Exception _ {}))) + +(defn update-local-state [kw value] + (let [state (load-local-state)] + (->> (assoc state kw value) + (spit ".last_import.edn")))) + +(defn remove-empty-vals [m] + (into {} (remove (comp nil? second) m))) + +(defn read-last-state [kw] + (let [state (load-local-state)] + (get state kw nil))) + +(defn rand-str [len] + (apply str (take len (repeatedly #(char (+ (rand 26) 65)))))) + +(defn before? [last-import-ts last-update-ts] + (if (nil? last-import-ts) + true + (not (or (t/after? last-import-ts last-update-ts) + (t/equal? last-import-ts last-update-ts))))) + +(defn interval> [start-ms end-ms timeout-s] + (> (- end-ms start-ms) (* 1000 timeout-s))) + diff --git a/test/bq2pg/bq_test.clj b/test/bq2pg/bq_test.clj new file mode 100644 index 0000000..d4a6f47 --- /dev/null +++ b/test/bq2pg/bq_test.clj @@ -0,0 +1,19 @@ +(ns bq2pg.bq-test + (:require [clojure.test :refer [deftest testing is]] + [bq2pg.bq :refer [load-sa create-query-conf create-job-id]])) + +(deftest load-sa-test + (testing "loading wrong service account" + (is (= nil (load-sa "no.file"))))) + +(deftest create-query-conf-test + (testing "creating query job configuration" + (is (= com.google.cloud.bigquery.QueryJobConfiguration + (type + (create-query-conf "testing-query" "testing-name")))))) + +(deftest create-job-id-test + (testing "creating job" + (is (= com.google.cloud.bigquery.AutoValue_JobId + (type + (create-job-id "testing-name" "EU")))))) diff --git a/test/bq2pg/config_test.clj b/test/bq2pg/config_test.clj new file mode 100644 index 0000000..cf8a31d --- /dev/null +++ b/test/bq2pg/config_test.clj @@ -0,0 +1,48 @@ +(ns bq2pg.config-test + (:require [clojure.test :refer [deftest testing is]] + [bq2pg.config :refer [coerce-config]])) + +(def testing-config + {:gcs-name "testing-bucket" + :gcs-folder "testing-folder" + :db {:dbtype "postgres" + :dbname "testing-database" + :user "testing-user" + :host "testing-host" + :port 5432} + :export? false + :import? false + :integrations [{:name "testing-integration" + :location "EU" + :query "select * from testing-dataset.testing-table" + :target-pg-table "testing.table" + :timeout 10 + :method "replace"}]}) + +(deftest coerce-config-test + (testing "Testing coerce config" + (is (= (coerce-config testing-config) + testing-config)))) + +(def testing-config-with-sa + {:gcs-name "testing-bucket" + :gcs-folder "testing-folder" + :db {:dbtype "postgres" + :dbname "testing-database" + :user "testing-user" + :host "testing-host" + :port 5432} + :export? false + :import? false + :integrations [{:name "testing-integration" + :location "EU" + :query "select * from testing-dataset.testing-table" + :target-pg-table "testing.table" + :timeout 10 + :method "replace"}] + :sa-path "/home/nonexistingsa.json"}) + +(deftest coerce-config-test-with-sa + (testing "Testing coerce config" + (is (= (coerce-config testing-config-with-sa) + testing-config-with-sa)))) diff --git a/test/bq2pg/core_test.clj b/test/bq2pg/core_test.clj new file mode 100644 index 0000000..6a799bb --- /dev/null +++ b/test/bq2pg/core_test.clj @@ -0,0 +1,8 @@ +(ns bq2pg.core-test + (:require [clojure.test :refer :all] + [bq2pg.core :refer :all])) + +(deftest a-test + (testing "FIXME, I fail." + (is (= 1 1)))) + diff --git a/test/bq2pg/proxy_test.clj b/test/bq2pg/proxy_test.clj new file mode 100644 index 0000000..1a7eee7 --- /dev/null +++ b/test/bq2pg/proxy_test.clj @@ -0,0 +1,12 @@ +(ns bq2pg.proxy-test + (:require [clojure.test :refer [deftest testing is]] + [bq2pg.proxy :refer [get-credentials]])) + +(deftest get-credentials-test + (testing "get username and password for proxy" + (let [proxy-cfg {:proxy-host "host" + :proxy-user "user" + :proxy-password "password"} + options {:host "host"}] + (is (= (get-credentials proxy-cfg options) + ["user" "password"]))))) diff --git a/test/bq2pg/utils_test.clj b/test/bq2pg/utils_test.clj new file mode 100644 index 0000000..48875a8 --- /dev/null +++ b/test/bq2pg/utils_test.clj @@ -0,0 +1,39 @@ +(ns bq2pg.utils-test + (:require [clojure.test :refer [deftest testing is]] + [bq2pg.utils :refer [gcs-directory load-local-state + read-last-state + rand-str + before? + interval>]] + [clj-time.core :as t])) + +(deftest gcs-directory-test + (testing "creating gcs-directory path" + (is + (= "gs://testing-bucket/path/to/file" + (gcs-directory "testing-bucket" "path" "to" "file"))))) + +(deftest load-local-state-test + (testing "loading local state file" + (is (= clojure.lang.PersistentArrayMap + (type (load-local-state)))))) + +(deftest read-local-state-test + (testing "reading a value from local state file" + (is (nil? (read-last-state :non-existing-key))))) + +(deftest rand-str-test + (testing "generating random string of len 12" + (let [random-string (rand-str 12)] + (is (and (string? random-string) + (= 12 (count random-string))))))) + +(deftest before-test + (testing "date before" + (let [date (t/now)] + (is (= false (before? date date)))))) + +(deftest interval>-test + (testing "bigger interval" + (is (true? (interval> 1000 2100 1)) + (false? (interval> 1000 2000 1)))))